hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1368722 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/
Date Thu, 02 Aug 2012 21:57:42 GMT
Author: sseth
Date: Thu Aug  2 21:57:42 2012
New Revision: 1368722

URL: http://svn.apache.org/viewvc?rev=1368722&view=rev
Log:
Merge MAPREDUCE-3289 from trunk. Make use of fadvise in the NM's shuffle handler. (Contributed
by Todd Lipcon and Siddharth Seth)

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
      - copied unchanged from r1368718, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
      - copied unchanged from r1368718, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1368722&r1=1368721&r2=1368722&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Aug  2 21:57:42
2012
@@ -68,6 +68,9 @@ Release 2.1.0-alpha - Unreleased 
     MAPREDUCE-4427. Added an 'unmanaged' mode for AMs so as to ease
     development of new applications. (Bikas Saha via acmurthy) 
 
+    MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
+    (Todd Lipcon and Siddharth Seth via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1368722&r1=1368721&r2=1368722&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
Thu Aug  2 21:57:42 2012
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -86,9 +87,7 @@ import org.jboss.netty.channel.ChannelHa
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultFileRegion;
 import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.FileRegion;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
@@ -104,7 +103,6 @@ import org.jboss.netty.handler.codec.htt
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
@@ -114,6 +112,12 @@ public class ShuffleHandler extends Abst
     implements AuxServices.AuxiliaryService {
 
   private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+  
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
 
   private int port;
   private ChannelFactory selector;
@@ -121,6 +125,15 @@ public class ShuffleHandler extends Abst
   private HttpPipelineFactory pipelineFact;
   private int sslFileBufferSize;
 
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private boolean manageOsCache;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+   
+
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce.shuffle";
 
@@ -242,6 +255,12 @@ public class ShuffleHandler extends Abst
 
   @Override
   public synchronized void init(Configuration conf) {
+    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+        DEFAULT_SHUFFLE_READAHEAD_BYTES);
+    
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
       .build();
@@ -503,14 +522,14 @@ public class ShuffleHandler extends Abst
           base + "/file.out", conf);
       LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " +
           indexFileName);
-      IndexRecord info = 
+      final IndexRecord info = 
         indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
       final ShuffleHeader header =
         new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
       final DataOutputBuffer dob = new DataOutputBuffer();
       header.write(dob);
       ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-      File spillfile = new File(mapOutputFileName.toString());
+      final File spillfile = new File(mapOutputFileName.toString());
       RandomAccessFile spill;
       try {
         spill = new RandomAccessFile(spillfile, "r");
@@ -520,22 +539,25 @@ public class ShuffleHandler extends Abst
       }
       ChannelFuture writeFuture;
       if (ch.getPipeline().get(SslHandler.class) == null) {
-        final FileRegion partition = new DefaultFileRegion(
-            spill.getChannel(), info.startOffset, info.partLength);
+        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+            info.startOffset, info.partLength, manageOsCache, readaheadLength,
+            readaheadPool, spillfile.getAbsolutePath());
         writeFuture = ch.write(partition);
         writeFuture.addListener(new ChannelFutureListener() {
             // TODO error handling; distinguish IO/connection failures,
             //      attribute to appropriate spill output
-            @Override
-            public void operationComplete(ChannelFuture future) {
-              partition.releaseExternalResources();
-            }
-          });
+          @Override
+          public void operationComplete(ChannelFuture future) {
+            partition.releaseExternalResources();
+          }
+        });
       } else {
         // HTTPS cannot be done with zero copy.
-        writeFuture = ch.write(new ChunkedFile(spill, info.startOffset,
-                                               info.partLength,
-                                               sslFileBufferSize));
+        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+            info.startOffset, info.partLength, sslFileBufferSize,
+            manageOsCache, readaheadLength, readaheadPool,
+            spillfile.getAbsolutePath());
+        writeFuture = ch.write(chunk);
       }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(info.partLength); // optimistic



Mime
View raw message