hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1368724 - in /hadoop/common/branches/branch-1: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Date Thu, 02 Aug 2012 21:58:41 GMT
Author: sseth
Date: Thu Aug  2 21:58:40 2012
New Revision: 1368724

URL: http://svn.apache.org/viewvc?rev=1368724&view=rev
Log:
MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. (Contributed by Todd Lipcon
and Brandon Li)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1368724&r1=1368723&r2=1368724&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Thu Aug  2 21:58:40 2012
@@ -65,6 +65,9 @@ Release 1.2.0 - unreleased
 
     HDFS-3697. Enable fadvise readahead by default. (todd via eli)
 
+    MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
+    (Todd Lipcon and Brandon Li via sseth)
+
   OPTIMIZATIONS
 
     HDFS-2533. Backport: Remove needless synchronization on some FSDataSet

Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1368724&r1=1368723&r2=1368724&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu
Aug  2 21:58:40 2012
@@ -73,7 +73,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -341,6 +344,9 @@ public class TaskTracker implements MRCo
     "mapreduce.tasktracker.outofband.heartbeat.damper";
   static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
   private volatile int oobHeartbeatDamper;
+  private boolean manageOsCacheInShuffle = false;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
   
   // Track number of completed tasks to send an out-of-band heartbeat
   private AtomicInteger finishedCount = new AtomicInteger(0);
@@ -881,6 +887,12 @@ public class TaskTracker implements MRCo
     oobHeartbeatDamper = 
       fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
           DEFAULT_OOB_HEARTBEAT_DAMPER);
+    manageOsCacheInShuffle = fConf.getBoolean(
+      "mapreduce.shuffle.manage.os.cache",
+      true);
+    readaheadLength = fConf.getInt(
+      "mapreduce.shuffle.readahead.bytes",
+      4 * 1024 * 1024);
   }
 
   private void startJettyBugMonitor() {
@@ -3978,16 +3990,30 @@ public class TaskTracker implements MRCo
          * send it to the reducer.
          */
         //open the map-output file
+        String filePath = mapOutputFileName.toUri().getPath();
         mapOutputIn = SecureIOUtils.openForRead(
-            new File(mapOutputFileName.toUri().getPath()), runAsUserName);
+            new File(filePath), runAsUserName);
+            //new File(mapOutputFileName.toUri().getPath()), runAsUserName);
 
+        ReadaheadRequest curReadahead = null;
+        
         //seek to the correct offset for the reduce
         mapOutputIn.skip(info.startOffset);
         long rem = info.partLength;
-        int len =
-          mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
-        while (rem > 0 && len >= 0) {
+        long offset = info.startOffset;
+        while (rem > 0) {
+          if (tracker.manageOsCacheInShuffle && tracker.readaheadPool != null) {
+            curReadahead = tracker.readaheadPool.readaheadStream(filePath,
+                mapOutputIn.getFD(), offset, tracker.readaheadLength,
+                info.startOffset + info.partLength, curReadahead);
+          }
+          int len = mapOutputIn.read(buffer, 0,
+              (int) Math.min(rem, MAX_BYTES_TO_READ));
+          if (len < 0) {
+            break;
+          }
           rem -= len;
+          offset += len;
           try {
             shuffleMetrics.outputBytes(len);
             outStream.write(buffer, 0, len);
@@ -3997,10 +4023,18 @@ public class TaskTracker implements MRCo
             throw ie;
           }
           totalRead += len;
-          len =
-            mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
         
+        if (curReadahead != null) {
+          curReadahead.cancel();
+        }
+
+        // drop cache if possible
+        if (tracker.manageOsCacheInShuffle && info.partLength > 0) {
+          NativeIO.posixFadviseIfPossible(mapOutputIn.getFD(),
+              info.startOffset, info.partLength, NativeIO.POSIX_FADV_DONTNEED);
+        }
+
         if (LOG.isDebugEnabled()) {
           LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
                  " from map: " + mapId + " given " + info.partLength + "/" + 



Mime
View raw message