Author: ddas
Date: Tue Aug 18 08:54:21 2009
New Revision: 805324
URL: http://svn.apache.org/viewvc?rev=805324&view=rev
Log:
MAPREDUCE-773. Sends progress reports for compressed gzip inputs in maps. Fixes a native direct
buffer leak in LineRecordReader classes. Contributed by Hong Tang and Devaraj Das.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 18 08:54:21 2009
@@ -405,3 +405,7 @@
MAPREDUCE-852. In build.xml, remove the Main-Class, which is incorrectly
set in tools, and rename the target "tools-jar" to "tools". (szetszwo)
+
+ MAPREDUCE-773. Sends progress reports for compressed gzip inputs in maps.
+ Fixes a native direct buffer leak in LineRecordReader classes.
+ (Hong Tang and ddas)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
Tue Aug 18 08:54:21 2009
@@ -90,7 +90,7 @@
return true;
}
- public float getProgress() {
+ public float getProgress() throws IOException {
return lineRecordReader.getProgress();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/LineRecordReader.java Tue Aug
18 08:54:21 2009
@@ -27,8 +27,10 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
@@ -47,7 +49,10 @@
private long pos;
private long end;
private LineReader in;
+ private FSDataInputStream fileIn;
int maxLineLength;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
/**
* A class that provides a line reader from an input stream.
@@ -74,14 +79,14 @@
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
- final CompressionCodec codec = compressionCodecs.getCodec(file);
+ codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
- if (codec != null) {
- in = new LineReader(codec.createInputStream(fileIn), job);
- end = Long.MAX_VALUE;
+ fileIn = fs.open(split.getPath());
+ if (isCompressedInput()) {
+ decompressor = CodecPool.getDecompressor(codec);
+ in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
@@ -90,8 +95,7 @@
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
- start += in.readLine(new Text(), 0, (int) Math.min(
- (long) Integer.MAX_VALUE, end - start));
+ start += in.readLine(new Text(), 0, maxBytesToConsume());
}
this.pos = start;
}
@@ -124,18 +128,34 @@
return new Text();
}
+ private boolean isCompressedInput() { return (codec != null); }
+
+ private int maxBytesToConsume() {
+ return (isCompressedInput()) ? Integer.MAX_VALUE
+ : (int) Math.min(Integer.MAX_VALUE, (end - start));
+ }
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompressedInput()) {
+ retVal = fileIn.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+
+
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
- while (pos <= end) {
+ while (getFilePosition() <= end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength,
- Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
- maxLineLength));
+ Math.max(maxBytesToConsume(), maxLineLength));
if (newSize == 0) {
return false;
}
@@ -154,11 +174,11 @@
/**
* Get the progress within the split
*/
- public synchronized float getProgress() {
+ public synchronized float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
- return Math.min(1.0f, (pos - start) / (float)(end - start));
+ return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
}
}
@@ -167,8 +187,14 @@
}
public synchronized void close() throws IOException {
- if (in != null) {
- in.close();
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
}
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/KeyValueLineRecordReader.java
Tue Aug 18 08:54:21 2009
@@ -118,7 +118,7 @@
return value;
}
- public float getProgress() {
+ public float getProgress() throws IOException {
return lineRecordReader.getProgress();
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
Tue Aug 18 08:54:21 2009
@@ -26,8 +26,10 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -48,10 +50,13 @@
private long pos;
private long end;
private LineReader in;
+ private FSDataInputStream fileIn;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
private Counter inputByteCounter;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
@@ -65,14 +70,14 @@
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
- final CompressionCodec codec = compressionCodecs.getCodec(file);
+ codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
- if (codec != null) {
- in = new LineReader(codec.createInputStream(fileIn), job);
- end = Long.MAX_VALUE;
+ fileIn = fs.open(split.getPath());
+ if (isCompressedInput()) {
+ decompressor = CodecPool.getDecompressor(codec);
+ in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
@@ -81,12 +86,27 @@
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
- start += in.readLine(new Text(), 0, (int) Math.min(
- (long) Integer.MAX_VALUE, end - start));
+ start += in.readLine(new Text(), 0, maxBytesToConsume());
}
this.pos = start;
}
+ private boolean isCompressedInput() { return (codec != null); }
+
+ private int maxBytesToConsume() {
+ return (isCompressedInput()) ? Integer.MAX_VALUE
+ : (int) Math.min(Integer.MAX_VALUE, (end - start));
+ }
+
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompressedInput()) {
+ retVal = fileIn.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
@@ -98,10 +118,9 @@
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
- while (pos <= end) {
+ while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength,
- Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
- maxLineLength));
+ Math.max(maxBytesToConsume(), maxLineLength));
if (newSize == 0) {
break;
}
@@ -137,17 +156,23 @@
/**
* Get the progress within the split
*/
- public float getProgress() {
+ public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
- return Math.min(1.0f, (pos - start) / (float)(end - start));
+ return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
}
}
public synchronized void close() throws IOException {
- if (in != null) {
- in.close();
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
}
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=805324&r1=805323&r2=805324&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java
Tue Aug 18 08:54:21 2009
@@ -100,7 +100,7 @@
private float last = 0.0f;
private boolean progressCalled = false;
@Override
- public float getProgress() {
+ public float getProgress() throws IOException {
progressCalled = true;
final float ret = super.getProgress();
assertTrue("getProgress decreased", ret >= last);
|