Author: tucu
Date: Wed Aug 15 23:11:00 2012
New Revision: 1373671
URL: http://svn.apache.org/viewvc?rev=1373671&view=rev
Log:
MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1373671&r1=1373670&r2=1373671&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Aug 15 23:11:00
2012
@@ -16,6 +16,8 @@ Branch-2 ( Unreleased changes )
MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved
(Jason Lowe via bobby)
+ MAPREDUCE-4511. Add IFile readahead (ahmed via tucu)
+
BUG FIXES
MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1373671&r1=1373670&r2=1373671&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
Wed Aug 15 23:11:00 2012
@@ -340,7 +340,7 @@ public class IFile {
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
- checksumIn = new IFileInputStream(in,length);
+ checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java?rev=1373671&r1=1373670&r2=1373671&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java
Wed Aug 15 23:11:00 2012
@@ -19,13 +19,22 @@
package org.apache.hadoop.mapred;
import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.DataChecksum;
/**
* A checksum input stream, used for IFiles.
@@ -35,7 +44,8 @@ import org.apache.hadoop.util.DataChecks
@InterfaceStability.Unstable
public class IFileInputStream extends InputStream {
- private final InputStream in; //The input stream to be verified for checksum.
+ private final InputStream in; //The input stream to be verified for checksum.
+ private final FileDescriptor inFd; // the file descriptor, if it is known
private final long length; //The total length of the input file
private final long dataLength;
private DataChecksum sum;
@@ -43,7 +53,14 @@ public class IFileInputStream extends In
private final byte b[] = new byte[1];
private byte csum[] = null;
private int checksumSize;
-
+
+ private ReadaheadRequest curReadahead = null;
+ private ReadaheadPool raPool = ReadaheadPool.getInstance();
+ private boolean readahead;
+ private int readaheadLength;
+
+ public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
private boolean disableChecksumValidation = false;
/**
@@ -51,13 +68,36 @@ public class IFileInputStream extends In
* @param in The input stream to be verified for checksum.
* @param len The length of the input stream including checksum bytes.
*/
- public IFileInputStream(InputStream in, long len) {
+ public IFileInputStream(InputStream in, long len, Configuration conf) {
this.in = in;
+ this.inFd = getFileDescriptorIfAvail(in);
sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
Integer.MAX_VALUE);
checksumSize = sum.getChecksumSize();
length = len;
dataLength = length - checksumSize;
+
+ conf = (conf != null) ? conf : new Configuration();
+ readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD,
+ MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD);
+ readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES,
+ MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+ doReadahead();
+ }
+
+ private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+ FileDescriptor fd = null;
+ try {
+ if (in instanceof HasFileDescriptor) {
+ fd = ((HasFileDescriptor)in).getFileDescriptor();
+ } else if (in instanceof FileInputStream) {
+ fd = ((FileInputStream)in).getFD();
+ }
+ } catch (IOException e) {
+ LOG.info("Unable to determine FileDescriptor", e);
+ }
+ return fd;
}
/**
@@ -66,6 +106,10 @@ public class IFileInputStream extends In
*/
@Override
public void close() throws IOException {
+
+ if (curReadahead != null) {
+ curReadahead.cancel();
+ }
if (currentOffset < dataLength) {
byte[] t = new byte[Math.min((int)
(Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -102,10 +146,21 @@ public class IFileInputStream extends In
if (currentOffset >= dataLength) {
return -1;
}
-
+
+ doReadahead();
+
return doRead(b,off,len);
}
+ private void doReadahead() {
+ if (raPool != null && inFd != null && readahead) {
+ curReadahead = raPool.readaheadStream(
+ "ifile", inFd,
+ currentOffset, readaheadLength, dataLength,
+ curReadahead);
+ }
+ }
+
/**
* Read bytes from the stream.
* At EOF, checksum is validated and sent back
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1373671&r1=1373670&r2=1373671&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
Wed Aug 15 23:11:00 2012
@@ -84,4 +84,20 @@ public interface MRConfig {
"mapreduce.shuffle.ssl.enabled";
public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
-}
+
+ /**
+ * Configuration key to enable/disable IFile readahead.
+ */
+ public static final String MAPRED_IFILE_READAHEAD =
+ "mapreduce.ifile.readahead";
+
+ public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+ /**
+ * Configuration key to set the IFile readahead length in bytes.
+ */
+ public static final String MAPRED_IFILE_READAHEAD_BYTES =
+ "mapreduce.ifile.readahead.bytes";
+
+ public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+ 4 * 1024 * 1024;}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1373671&r1=1373670&r2=1373671&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Wed Aug 15 23:11:00 2012
@@ -98,6 +98,8 @@ class Fetcher<K,V> extends Thread {
private volatile boolean stopped = false;
+ private JobConf job;
+
private static boolean sslShuffle;
private static SSLFactory sslFactory;
@@ -105,6 +107,7 @@ class Fetcher<K,V> extends Thread {
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
+ this.job = job;
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -539,7 +542,7 @@ class Fetcher<K,V> extends Thread {
int decompressedLength,
int compressedLength) throws IOException {
IFileInputStream checksumIn =
- new IFileInputStream(input, compressedLength);
+ new IFileInputStream(input, compressedLength, job);
input = checksumIn;
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1373671&r1=1373670&r2=1373671&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Wed Aug 15 23:11:00 2012
@@ -959,6 +959,20 @@
acceptable.
</description>
</property>
+
+ <property>
+ <name>mapreduce.ifile.readahead</name>
+ <value>true</value>
+ <description>Configuration key to enable/disable IFile readahead.
+ </description>
+ </property>
+
+ <property>
+ <name>mapreduce.ifile.readahead.bytes</name>
+ <value>4194304</value>
+ <description>Configuration key to set the IFile readahead length in bytes.
+ </description>
+ </property>
<!-- Job Notification Configuration -->
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java?rev=1373671&r1=1373670&r2=1373671&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java
Wed Aug 15 23:11:00 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -35,7 +36,7 @@ public class TestIFileStreams extends Te
ifos.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 104);
+ IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
for (int i = 0; i < DLEN; ++i) {
assertEquals(i, ifis.read());
}
@@ -54,7 +55,7 @@ public class TestIFileStreams extends Te
final byte[] b = dob.getData();
++b[17];
dib.reset(b, DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 104);
+ IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration());
int i = 0;
try {
while (i < DLEN) {
@@ -83,7 +84,7 @@ public class TestIFileStreams extends Te
ifos.close();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(dob.getData(), DLEN + 4);
- IFileInputStream ifis = new IFileInputStream(dib, 100);
+ IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration());
int i = 0;
try {
while (i < DLEN - 8) {
|