hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1368718 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/
Date Thu, 02 Aug 2012 21:55:37 GMT
Author: sseth
Date: Thu Aug  2 21:55:37 2012
New Revision: 1368718

URL: http://svn.apache.org/viewvc?rev=1368718&view=rev
Log:
MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. (Contributed by Todd Lipcon
and Siddharth Seth)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1368718&r1=1368717&r2=1368718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Aug  2 21:55:37 2012
@@ -206,6 +206,9 @@ Release 2.1.0-alpha - Unreleased 
 
     MAPREDUCE-4447. Remove aop from cruft from the ant build. (eli)
 
+    MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
+    (Todd Lipcon and Siddharth Seth via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java?rev=1368718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
(added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
Thu Aug  2 21:55:37 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.handler.stream.ChunkedFile;
+
+public class FadvisedChunkedFile extends ChunkedFile {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedChunkedFile.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedChunkedFile(RandomAccessFile file, long position, long count,
+      int chunkSize, boolean manageOsCache, int readaheadLength,
+      ReadaheadPool readaheadPool, String identifier) throws IOException {
+    super(file, position, count, chunkSize);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public Object nextChunk() throws Exception {
+    if (manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool
+          .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
+              getEndOffset(), readaheadRequest);
+    }
+    return super.nextChunk();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+      try {
+        NativeIO.posixFadviseIfPossible(fd, getStartOffset(), getEndOffset()
+            - getStartOffset(), NativeIO.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.close();
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java?rev=1368718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
(added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
Thu Aug  2 21:55:37 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.jboss.netty.channel.DefaultFileRegion;
+
+public class FadvisedFileRegion extends DefaultFileRegion {
+
+  private static final Log LOG = LogFactory.getLog(FadvisedFileRegion.class);
+
+  private final boolean manageOsCache;
+  private final int readaheadLength;
+  private final ReadaheadPool readaheadPool;
+  private final FileDescriptor fd;
+  private final String identifier;
+
+  private ReadaheadRequest readaheadRequest;
+
+  public FadvisedFileRegion(RandomAccessFile file, long position, long count,
+      boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
+      String identifier) throws IOException {
+    super(file.getChannel(), position, count);
+    this.manageOsCache = manageOsCache;
+    this.readaheadLength = readaheadLength;
+    this.readaheadPool = readaheadPool;
+    this.fd = file.getFD();
+    this.identifier = identifier;
+  }
+
+  @Override
+  public long transferTo(WritableByteChannel target, long position)
+      throws IOException {
+    if (manageOsCache && readaheadPool != null) {
+      readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+          getPosition() + position, readaheadLength,
+          getPosition() + getCount(), readaheadRequest);
+    }
+    return super.transferTo(target, position);
+  }
+
+  @Override
+  public void releaseExternalResources() {
+    if (readaheadRequest != null) {
+      readaheadRequest.cancel();
+    }
+    if (manageOsCache && getCount() > 0) {
+      try {
+        NativeIO.posixFadviseIfPossible(fd, getPosition(), getCount(),
+            NativeIO.POSIX_FADV_DONTNEED);
+      } catch (Throwable t) {
+        LOG.warn("Failed to manage OS cache for " + identifier, t);
+      }
+    }
+    super.releaseExternalResources();
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1368718&r1=1368717&r2=1368718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
(original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
Thu Aug  2 21:55:37 2012
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.LocalDirAllo
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.ReadaheadPool;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -86,9 +87,7 @@ import org.jboss.netty.channel.ChannelHa
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultFileRegion;
 import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.FileRegion;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.jboss.netty.channel.group.ChannelGroup;
@@ -104,7 +103,6 @@ import org.jboss.netty.handler.codec.htt
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
@@ -114,6 +112,12 @@ public class ShuffleHandler extends Abst
     implements AuxServices.AuxiliaryService {
 
   private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+  
+  public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
+  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
+
+  public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
+  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
 
   private int port;
   private ChannelFactory selector;
@@ -121,6 +125,15 @@ public class ShuffleHandler extends Abst
   private HttpPipelineFactory pipelineFact;
   private int sslFileBufferSize;
 
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile
+   */
+  private boolean manageOsCache;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+   
+
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce.shuffle";
 
@@ -242,6 +255,12 @@ public class ShuffleHandler extends Abst
 
   @Override
   public synchronized void init(Configuration conf) {
+    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+        DEFAULT_SHUFFLE_READAHEAD_BYTES);
+    
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
       .build();
@@ -503,14 +522,14 @@ public class ShuffleHandler extends Abst
           base + "/file.out", conf);
       LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " +
           indexFileName);
-      IndexRecord info = 
+      final IndexRecord info = 
         indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
       final ShuffleHeader header =
         new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
       final DataOutputBuffer dob = new DataOutputBuffer();
       header.write(dob);
       ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-      File spillfile = new File(mapOutputFileName.toString());
+      final File spillfile = new File(mapOutputFileName.toString());
       RandomAccessFile spill;
       try {
         spill = new RandomAccessFile(spillfile, "r");
@@ -520,22 +539,25 @@ public class ShuffleHandler extends Abst
       }
       ChannelFuture writeFuture;
       if (ch.getPipeline().get(SslHandler.class) == null) {
-        final FileRegion partition = new DefaultFileRegion(
-            spill.getChannel(), info.startOffset, info.partLength);
+        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+            info.startOffset, info.partLength, manageOsCache, readaheadLength,
+            readaheadPool, spillfile.getAbsolutePath());
         writeFuture = ch.write(partition);
         writeFuture.addListener(new ChannelFutureListener() {
             // TODO error handling; distinguish IO/connection failures,
             //      attribute to appropriate spill output
-            @Override
-            public void operationComplete(ChannelFuture future) {
-              partition.releaseExternalResources();
-            }
-          });
+          @Override
+          public void operationComplete(ChannelFuture future) {
+            partition.releaseExternalResources();
+          }
+        });
       } else {
         // HTTPS cannot be done with zero copy.
-        writeFuture = ch.write(new ChunkedFile(spill, info.startOffset,
-                                               info.partLength,
-                                               sslFileBufferSize));
+        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+            info.startOffset, info.partLength, sslFileBufferSize,
+            manageOsCache, readaheadLength, readaheadPool,
+            spillfile.getAbsolutePath());
+        writeFuture = ch.write(chunk);
       }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(info.partLength); // optimistic



Mime
View raw message