hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r805324 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapreduce/
Date Tue, 18 Aug 2009 08:54:22 GMT
Author: ddas
Date: Tue Aug 18 08:54:21 2009
New Revision: 805324

URL: http://svn.apache.org/viewvc?rev=805324&view=rev
Log:
MAPREDUCE-773. Sends progress reports for compressed gzip inputs in maps. Fixes a native direct
buffer leak in LineRecordReader classes. Contributed by Hong Tang and Devaraj Das.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 18 08:54:21 2009
@@ -405,3 +405,7 @@
 
     MAPREDUCE-852. In build.xml, remove the Main-Class, which is incorrectly
     set in tools, and rename the target "tools-jar" to "tools".  (szetszwo)
+
+    MAPREDUCE-773. Sends progress reports for compressed gzip inputs in maps.
+    Fixes a native direct buffer leak in LineRecordReader classes.
+    (Hong Tang and ddas)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
Tue Aug 18 08:54:21 2009
@@ -90,7 +90,7 @@
     return true;
   }
   
-  public float getProgress() {
+  public float getProgress() throws IOException {
     return lineRecordReader.getProgress();
   }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Tue Aug
18 08:54:21 2009
@@ -27,8 +27,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -47,7 +49,10 @@
   private long pos;
   private long end;
   private LineReader in;
+  private FSDataInputStream fileIn;
   int maxLineLength;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
 
   /**
    * A class that provides a line reader from an input stream.
@@ -74,14 +79,14 @@
     end = start + split.getLength();
     final Path file = split.getPath();
     compressionCodecs = new CompressionCodecFactory(job);
-    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
-    FSDataInputStream fileIn = fs.open(split.getPath());
-    if (codec != null) {
-      in = new LineReader(codec.createInputStream(fileIn), job);
-      end = Long.MAX_VALUE;
+    fileIn = fs.open(split.getPath());
+    if (isCompressedInput()) {
+      decompressor = CodecPool.getDecompressor(codec);
+      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
@@ -90,8 +95,7 @@
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, (int) Math.min(
-          (long) Integer.MAX_VALUE, end - start));
+      start += in.readLine(new Text(), 0, maxBytesToConsume());
     }
     this.pos = start;
   }
@@ -124,18 +128,34 @@
     return new Text();
   }
   
+  private boolean isCompressedInput() { return (codec != null); }
+  
+  private int maxBytesToConsume() {
+    return (isCompressedInput()) ? Integer.MAX_VALUE
+                           : (int) Math.min(Integer.MAX_VALUE, (end - start));
+  }
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput()) {
+      retVal = fileIn.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
+
+  
   /** Read a line. */
   public synchronized boolean next(LongWritable key, Text value)
     throws IOException {
 
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
-    while (pos <= end) {
+    while (getFilePosition() <= end) {
       key.set(pos);
 
       int newSize = in.readLine(value, maxLineLength,
-                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
-                                         maxLineLength));
+                                Math.max(maxBytesToConsume(), maxLineLength));
       if (newSize == 0) {
         return false;
       }
@@ -154,11 +174,11 @@
   /**
    * Get the progress within the split
    */
-  public synchronized float getProgress() {
+  public synchronized float getProgress() throws IOException {
     if (start == end) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (pos - start) / (float)(end - start));
+      return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
     }
   }
   
@@ -167,8 +187,14 @@
   }
 
   public synchronized void close() throws IOException {
-    if (in != null) {
-      in.close(); 
+    try {
+      if (in != null) {
+        in.close();
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
Tue Aug 18 08:54:21 2009
@@ -118,7 +118,7 @@
     return value;
   }
 
-  public float getProgress() {
+  public float getProgress() throws IOException {
     return lineRecordReader.getProgress();
   }
   

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
Tue Aug 18 08:54:21 2009
@@ -26,8 +26,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -48,10 +50,13 @@
   private long pos;
   private long end;
   private LineReader in;
+  private FSDataInputStream fileIn;
   private int maxLineLength;
   private LongWritable key = null;
   private Text value = null;
   private Counter inputByteCounter;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
 
   public void initialize(InputSplit genericSplit,
                          TaskAttemptContext context) throws IOException {
@@ -65,14 +70,14 @@
     end = start + split.getLength();
     final Path file = split.getPath();
     compressionCodecs = new CompressionCodecFactory(job);
-    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
     FileSystem fs = file.getFileSystem(job);
-    FSDataInputStream fileIn = fs.open(split.getPath());
-    if (codec != null) {
-      in = new LineReader(codec.createInputStream(fileIn), job);
-      end = Long.MAX_VALUE;
+    fileIn = fs.open(split.getPath());
+    if (isCompressedInput()) {
+      decompressor = CodecPool.getDecompressor(codec);
+      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
@@ -81,12 +86,27 @@
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, (int) Math.min(
-          (long) Integer.MAX_VALUE, end - start));
+      start += in.readLine(new Text(), 0, maxBytesToConsume());
     }
     this.pos = start;
   }
   
+  private boolean isCompressedInput() { return (codec != null); }
+  
+  private int maxBytesToConsume() {
+    return (isCompressedInput()) ? Integer.MAX_VALUE
+                           : (int) Math.min(Integer.MAX_VALUE, (end - start));
+  }
+  
+  private long getFilePosition() throws IOException {
+    long retVal;
+    if (isCompressedInput()) {
+      retVal = fileIn.getPos();
+    } else {
+      retVal = pos;
+    }
+    return retVal;
+  }
   public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
@@ -98,10 +118,9 @@
     int newSize = 0;
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
-    while (pos <= end) {
+    while (getFilePosition() <= end) {
       newSize = in.readLine(value, maxLineLength,
-                            Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
-                                     maxLineLength));
+                            Math.max(maxBytesToConsume(), maxLineLength));
       if (newSize == 0) {
         break;
       }
@@ -137,17 +156,23 @@
   /**
    * Get the progress within the split
    */
-  public float getProgress() {
+  public float getProgress() throws IOException {
     if (start == end) {
       return 0.0f;
     } else {
-      return Math.min(1.0f, (pos - start) / (float)(end - start));
+      return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
     }
   }
   
   public synchronized void close() throws IOException {
-    if (in != null) {
-      in.close(); 
+    try {
+      if (in != null) {
+        in.close();
+      }
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Tue Aug 18 08:54:21 2009
@@ -100,7 +100,7 @@
       private float last = 0.0f;
       private boolean progressCalled = false;
       @Override
-      public float getProgress() {
+      public float getProgress() throws IOException {
         progressCalled = true;
         final float ret = super.getProgress();
         assertTrue("getProgress decreased", ret >= last);



Mime
View raw message