hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077009 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 03:30:02 GMT
Author: omalley
Date: Fri Mar  4 03:30:02 2011
New Revision: 1077009

URL: http://svn.apache.org/viewvc?rev=1077009&view=rev
Log:
commit 1f2a5df1b2a40a3aebca212904165f3fd6d9f85c
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Mon Sep 28 14:28:46 2009 -0700

    MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band heartbeat on task-completion
for better job-latency. Contributed by Arun C. Murthy
    Configuration changes:
      add mapreduce.tasktracker.outofband.heartbeat
    
    from: https://issues.apache.org/jira/secure/attachment/12420718/MAPREDUCE-270_yhadoop20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +60. MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
    +    heartbeat on task-completion for better job-latency. Contributed by
    +    Arun C. Murthy
    +    Configuration changes:
    +      add mapreduce.tasktracker.outofband.heartbeat
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri
Mar  4 03:30:02 2011
@@ -223,6 +223,14 @@
 </property>
 
 <property>
+  <name>mapreduce.tasktracker.outofband.heartbeat</name>
+  <value>false</value>
+  <description>Expert: Set this to true to let the tasktracker send an 
+  out-of-band heartbeat on task-completion for better latency.
+  </description>
+</property>
+
+<property>
   <name>mapred.jobtracker.restart.recover</name>
   <value>false</value>
   <description>"true" to enable (job) recovery upon restart,
@@ -611,7 +619,7 @@
   <name>mapred.heartbeats.in.second</name>
   <value>100</value>
   <description>Expert: Approximate number of heart-beats that could arrive 
-               JobTracker in a second. Assuming each RPC can be processed 
+               at JobTracker in a second. Assuming each RPC can be processed 
                in 10msec, the default value is made 100 RPCs in a second.
   </description>
 </property> 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar  4 03:30:02 2011
@@ -133,9 +133,21 @@ public class JobTracker implements MRCon
   // The maximum number of blacklists for a tracker after which the 
   // tracker could be blacklisted across all jobs
   private int MAX_BLACKLISTS_PER_TRACKER = 4;
+  
   // Approximate number of heartbeats that could arrive JobTracker
   // in a second
-  private int NUM_HEARTBEATS_IN_SECOND = 100;
+  static final String JT_HEARTBEATS_IN_SECOND = "mapred.heartbeats.in.second";
+  private int NUM_HEARTBEATS_IN_SECOND;
+  private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
+  private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
+  
+  // Scaling factor for heartbeats, used for testing only
+  static final String JT_HEARTBEATS_SCALING_FACTOR = 
+    "mapreduce.jobtracker.heartbeats.scaling.factor";
+  private float HEARTBEATS_SCALING_FACTOR;
+  private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
+  private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+  
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
@@ -1908,8 +1920,19 @@ public class JobTracker implements MRCon
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum",
100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);
+    
     NUM_HEARTBEATS_IN_SECOND = 
-        conf.getInt("mapred.heartbeats.in.second", 100);
+      conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
+    if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
+      NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND;
+    }
+    
+    HEARTBEATS_SCALING_FACTOR = 
+      conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR, 
+                    DEFAULT_HEARTBEATS_SCALING_FACTOR);
+    if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) {
+      HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
+    }
 
     //This configuration is there solely for tuning purposes and 
     //once this feature has been tested in real clusters and an appropriate
@@ -2979,15 +3002,16 @@ public class JobTracker implements MRCon
   
   /**
    * Calculates next heartbeat interval using cluster size.
-   * Heartbeat interval is incremented 1second for every 50 nodes. 
+   * Heartbeat interval is incremented by 1 second for every 100 nodes by default. 
    * @return next heartbeat interval.
    */
   public int getNextHeartbeatInterval() {
     // get the no of task trackers
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
-                                (int)(1000 * Math.ceil((double)clusterSize / 
-                                                       NUM_HEARTBEATS_IN_SECOND)),
+                                (int)(1000 * HEARTBEATS_SCALING_FACTOR *
+                                      Math.ceil((double)clusterSize / 
+                                                NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
Fri Mar  4 03:30:02 2011
@@ -538,7 +538,12 @@ abstract class TaskRunner extends Thread
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tip.reportTaskFinished();
+      
+      // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
+      // *false* since the task has either
+      // a) SUCCEEDED - which means commit has been done
+      // b) FAILED - which means we do not need to commit
+      tip.reportTaskFinished(false);
     }
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Fri Mar  4 03:30:02 2011
@@ -205,6 +205,16 @@ public class TaskTracker 
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
+  
+  // Performance-related config knob to send an out-of-band heartbeat
+  // on task completion
+  static final String TT_OUTOFBAND_HEARBEAT =
+    "mapreduce.tasktracker.outofband.heartbeat";
+  private volatile boolean oobHeartbeatOnTaskCompletion;
+  
+  // Track number of completed tasks to send an out-of-band heartbeat
+  private IntWritable finishedCount = new IntWritable(0);
+  
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   private CleanupQueue directoryCleanupThread;
@@ -565,6 +575,9 @@ public class TaskTracker 
     if (shouldStartHealthMonitor(this.fConf)) {
       startHealthMonitor(this.fConf);
     }
+    
+    oobHeartbeatOnTaskCompletion = 
+      fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
   }
 
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
@@ -1041,8 +1054,14 @@ public class TaskTracker 
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time
-          Thread.sleep(waitTime);
+          // sleeps for the wait time or 
+          // until there are empty slots to schedule tasks
+          synchronized (finishedCount) {
+            if (finishedCount.get() == 0) {
+              finishedCount.wait(waitTime);
+            }
+            finishedCount.set(0);
+          }
         }
 
         // If the TaskTracker is just starting up:
@@ -1758,6 +1777,19 @@ public class TaskTracker 
     }
   }
 
+  /** 
+   * Notify the tasktracker to send an out-of-band heartbeat.
+   */
+  private void notifyTTAboutTaskCompletion() {
+    if (oobHeartbeatOnTaskCompletion) {
+      synchronized (finishedCount) {
+        int value = finishedCount.get();
+        finishedCount.set(value+1);
+        finishedCount.notify();
+      }
+    }
+  }
+  
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -2100,9 +2132,21 @@ public class TaskTracker 
       return wasKilled;
     }
 
-    void reportTaskFinished() {
-      taskFinished();
-      releaseSlot();
+    /**
+     * A task is reporting in as 'done'.
+     * 
+     * We need to notify the tasktracker to send an out-of-band heartbeat.
+     * If isn't <code>commitPending</code>, we need to finalize the task
+     * and release the slot it's occupied.
+     * 
+     * @param commitPending is the task-commit pending?
+     */
+    void reportTaskFinished(boolean commitPending) {
+      if (!commitPending) {
+        taskFinished();
+        releaseSlot();
+      }
+      notifyTTAboutTaskCompletion();
     }
 
     /* State changes:
@@ -2403,6 +2447,7 @@ public class TaskTracker 
       }
       removeFromMemoryManager(task.getTaskID());
       releaseSlot();
+      notifyTTAboutTaskCompletion();
     }
     
     private synchronized void releaseSlot() {
@@ -2714,9 +2759,7 @@ public class TaskTracker 
       tip = tasks.get(taskid);
     }
     if (tip != null) {
-      if (!commitPending) {
-        tip.reportTaskFinished();
-      }
+      tip.reportTaskFinished(commitPending);
     } else {
       LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMapredHeartbeat.java
Fri Mar  4 03:30:02 2011
@@ -21,6 +21,9 @@ import java.io.IOException;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.JobConf;
 
 public class TestMapredHeartbeat extends TestCase {
@@ -42,7 +45,7 @@ public class TestMapredHeartbeat extends
       
       // test configured heartbeat interval
       taskTrackers = 5;
-      conf.setInt("mapred.heartbeats.in.second", 1);
+      conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 1);
       mr = new MiniMRCluster(taskTrackers, "file:///", 3, 
           null, null, conf);
       jc = new JobClient(mr.createJobConf());
@@ -55,7 +58,7 @@ public class TestMapredHeartbeat extends
       
       // test configured heartbeat interval is capped with min value
       taskTrackers = 5;
-      conf.setInt("mapred.heartbeats.in.second", 10);
+      conf.setInt(JobTracker.JT_HEARTBEATS_IN_SECOND, 10);
       mr = new MiniMRCluster(taskTrackers, "file:///", 3, 
           null, null, conf);
       jc = new JobClient(mr.createJobConf());
@@ -68,6 +71,37 @@ public class TestMapredHeartbeat extends
       if (mr != null) { mr.shutdown(); }
     }
   }
+  
+  public void testOutOfBandHeartbeats() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      
+      int taskTrackers = 1;
+      JobConf jobConf = new JobConf();
+      jobConf.setFloat(JobTracker.JT_HEARTBEATS_SCALING_FACTOR, 30.0f);
+      jobConf.setBoolean(TaskTracker.TT_OUTOFBAND_HEARBEAT, true);
+      mr = new MiniMRCluster(taskTrackers, 
+                             dfs.getFileSystem().getUri().toString(), 3, 
+                             null, null, jobConf);
+      long start = System.currentTimeMillis();
+      TestMiniMRDFSSort.runRandomWriter(mr.createJobConf(), new Path("rw"));
+      long end = System.currentTimeMillis();
+      
+      final int expectedRuntimeSecs = 120;
+      final int runTimeSecs = (int)((end-start) / 1000); 
+      System.err.println("Runtime is " + runTimeSecs);
+      assertEquals("Actual runtime " + runTimeSecs + "s not less than expected " +
+                   "runtime of " + expectedRuntimeSecs + "s!", 
+                   true, (runTimeSecs <= 120));
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+      if (dfs != null) { dfs.shutdown(); }
+    }
+  }
+
 }
 
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=1077009&r1=1077008&r2=1077009&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Fri Mar  4 03:30:02 2011
@@ -71,7 +71,7 @@ public class TestMiniMRDFSSort extends T
     return setup;
   }
 
-  private static void runRandomWriter(JobConf job, Path sortInput) 
+  public static void runRandomWriter(JobConf job, Path sortInput) 
   throws Exception {
     // Scale down the default settings for RandomWriter for the test-case
     // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP



Mime
View raw message