hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r819740 - in /hadoop/mapreduce/trunk: ./ src/java/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/server/jobtracker/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src/test/mapred/org/apache/hadoop/mapred/
Date Mon, 28 Sep 2009 21:27:24 GMT
Author: acmurthy
Date: Mon Sep 28 21:27:22 2009
New Revision: 819740

URL: http://svn.apache.org/viewvc?rev=819740&view=rev
Log:
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band heartbeat on task-completion
for better job-latency. 
Configuration changes:
  add mapreduce.tasktracker.outofband.heartbeat

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Sep 28 21:27:22 2009
@@ -10,6 +10,11 @@
 
   OPTIMIZATIONS
 
+    MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
+    heartbeat on task-completion for better job-latency. (acmurthy) 
+    Configuration changes:
+      add mapreduce.tasktracker.outofband.heartbeat 
+
   BUG FIXES
 
     MAPREDUCE-1014. Fix the libraries for common and hdfs. (omalley)

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Mon Sep 28 21:27:22 2009
@@ -333,6 +333,14 @@
 </property>
 
 <property>
+  <name>mapreduce.tasktracker.outofband.heartbeat</name>
+  <value>false</value>
+  <description>Expert: Set this to true to let the tasktracker send an 
+  out-of-band heartbeat on task-completion for better latency.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.jobtracker.jobhistory.lru.cache.size</name>
   <value>5</value>
   <description>The number of job history files loaded in memory. The jobs are 
@@ -665,7 +673,7 @@
   <name>mapreduce.jobtracker.heartbeats.in.second</name>
   <value>100</value>
   <description>Expert: Approximate number of heart-beats that could arrive 
-               JobTracker in a second. Assuming each RPC can be processed 
+               at JobTracker in a second. Assuming each RPC can be processed 
                in 10msec, the default value is made 100 RPCs in a second.
   </description>
 </property> 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Sep 28 21:27:22
2009
@@ -131,9 +131,18 @@
   // The maximum number of blacklists for a tracker after which the 
   // tracker could be blacklisted across all jobs
   private int MAX_BLACKLISTS_PER_TRACKER = 4;
+  
   // Approximate number of heartbeats that could arrive JobTracker
   // in a second
-  private int NUM_HEARTBEATS_IN_SECOND = 100;
+  private int NUM_HEARTBEATS_IN_SECOND;
+  private final int DEFAULT_NUM_HEARTBEATS_IN_SECOND = 100;
+  private final int MIN_NUM_HEARTBEATS_IN_SECOND = 1;
+  
+  // Scaling factor for heartbeats, used for testing only
+  private float HEARTBEATS_SCALING_FACTOR;
+  private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
+  private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
+  
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
   private static final int FS_ACCESS_RETRY_PERIOD = 10000;
@@ -1294,8 +1303,21 @@
     tasktrackerExpiryInterval = 
       conf.getLong(JT_TRACKER_EXPIRY_INTERVAL, 10 * 60 * 1000);
     retiredJobsCacheSize = conf.getInt(JT_RETIREJOB_CACHE_SIZE, 1000);
-    MAX_BLACKLISTS_PER_TRACKER = conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
-    NUM_HEARTBEATS_IN_SECOND = conf.getInt(JT_HEARTBEATS_IN_SECOND, 100);
+    MAX_BLACKLISTS_PER_TRACKER = 
+      conf.getInt(JTConfig.JT_MAX_TRACKER_BLACKLISTS, 4);
+    
+    NUM_HEARTBEATS_IN_SECOND = 
+      conf.getInt(JT_HEARTBEATS_IN_SECOND, DEFAULT_NUM_HEARTBEATS_IN_SECOND);
+    if (NUM_HEARTBEATS_IN_SECOND < MIN_NUM_HEARTBEATS_IN_SECOND) {
+      NUM_HEARTBEATS_IN_SECOND = DEFAULT_NUM_HEARTBEATS_IN_SECOND;
+    }
+    
+    HEARTBEATS_SCALING_FACTOR = 
+      conf.getFloat(JT_HEARTBEATS_SCALING_FACTOR, 
+                    DEFAULT_HEARTBEATS_SCALING_FACTOR);
+    if (HEARTBEATS_SCALING_FACTOR < MIN_HEARTBEATS_SCALING_FACTOR) {
+      HEARTBEATS_SCALING_FACTOR = DEFAULT_HEARTBEATS_SCALING_FACTOR;
+    }
 
     //This configuration is there solely for tuning purposes and 
     //once this feature has been tested in real clusters and an appropriate
@@ -2347,15 +2369,16 @@
   
   /**
    * Calculates next heartbeat interval using cluster size.
-   * Heartbeat interval is incremented 1second for every 50 nodes. 
+   * Heartbeat interval is incremented by 1 second for every 100 nodes by default. 
    * @return next heartbeat interval.
    */
   public int getNextHeartbeatInterval() {
     // get the no of task trackers
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
-                                (int)(1000 * Math.ceil((double)clusterSize / 
-                                                       NUM_HEARTBEATS_IN_SECOND)),
+                                (int)(1000 * HEARTBEATS_SCALING_FACTOR *
+                                      Math.ceil((double)clusterSize / 
+                                                NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
   }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Sep 28 21:27:22
2009
@@ -253,7 +253,12 @@
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tip.reportTaskFinished();
+
+      // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
+      // *false* since the task has either
+      // a) SUCCEEDED - which means commit has been done
+      // b) FAILED - which means we do not need to commit
+      tip.reportTaskFinished(false);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Sep 28 21:27:22
2009
@@ -225,6 +225,14 @@
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
+  
+  // Performance-related config knob to send an out-of-band heartbeat
+  // on task completion
+  private volatile boolean oobHeartbeatOnTaskCompletion;
+  
+  // Track number of completed tasks to send an out-of-band heartbeat
+  private IntWritable finishedCount = new IntWritable(0);
+  
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
   CleanupQueue directoryCleanupThread;
@@ -636,6 +644,9 @@
     if (shouldStartHealthMonitor(this.fConf)) {
       startHealthMonitor(this.fConf);
     }
+    
+    oobHeartbeatOnTaskCompletion = 
+      fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
   }
 
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
@@ -1176,8 +1187,14 @@
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time
-          Thread.sleep(waitTime);
+          // sleeps for the wait time or
+          // until there are empty slots to schedule tasks
+          synchronized (finishedCount) {
+            if (finishedCount.get() == 0) {
+              finishedCount.wait(waitTime);
+            }
+            finishedCount.set(0);
+          }
         }
 
         // If the TaskTracker is just starting up:
@@ -1867,6 +1884,19 @@
     }
   }
 
+  /** 
+   * Notify the tasktracker to send an out-of-band heartbeat.
+   */
+  private void notifyTTAboutTaskCompletion() {
+    if (oobHeartbeatOnTaskCompletion) {
+      synchronized (finishedCount) {
+        int value = finishedCount.get();
+        finishedCount.set(value+1);
+        finishedCount.notify();
+      }
+    }
+  }
+  
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -2185,9 +2215,21 @@
       return wasKilled;
     }
 
-    void reportTaskFinished() {
-      taskFinished();
-      releaseSlot();
+    /**
+     * A task is reporting in as 'done'.
+     * 
+     * We need to notify the tasktracker to send an out-of-band heartbeat.
+     * If isn't <code>commitPending</code>, we need to finalize the task
+     * and release the slot it's occupied.
+     * 
+     * @param commitPending is the task-commit pending?
+     */
+    void reportTaskFinished(boolean commitPending) {
+      if (!commitPending) {
+        taskFinished();
+        releaseSlot();
+      }
+      notifyTTAboutTaskCompletion();
     }
 
     /* State changes:
@@ -2490,6 +2532,7 @@
       taskStatus.setFinishTime(System.currentTimeMillis());
       removeFromMemoryManager(task.getTaskID());
       releaseSlot();
+      notifyTTAboutTaskCompletion();
     }
     
     private synchronized void releaseSlot() {
@@ -2818,9 +2861,7 @@
       tip = tasks.get(taskid);
     }
     if (tip != null) {
-      if (!commitPending) {
-        tip.reportTaskFinished();
-      }
+      tip.reportTaskFinished(commitPending);
     } else {
       LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
Mon Sep 28 21:27:22 2009
@@ -41,6 +41,8 @@
     "mapreduce.jobtracker.maxtasks.perjob";
   public static final String JT_HEARTBEATS_IN_SECOND = 
     "mapreduce.jobtracker.heartbeats.in.second";
+  public static final String JT_HEARTBEATS_SCALING_FACTOR = 
+    "mapreduce.jobtracker.heartbeats.scaling.factor";
   public static final String JT_PERSIST_JOBSTATUS = 
     "mapreduce.jobtracker.persist.jobstatus.active";
   public static final String JT_PERSIST_JOBSTATUS_HOURS = 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
Mon Sep 28 21:27:22 2009
@@ -75,5 +75,6 @@
     "mapreduce.tasktracker.taskmemorymanager.monitoringinterval";
   public static final String TT_LOCAL_CACHE_SIZE = 
     "mapreduce.tasktracker.cache.local.size";
-
+  public static final String TT_OUTOFBAND_HEARBEAT =
+    "mapreduce.tasktracker.outofband.heartbeat";
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
Mon Sep 28 21:27:22 2009
@@ -21,8 +21,14 @@
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 public class TestMapredHeartbeat extends TestCase {
   public void testJobDirCleanup() throws IOException {
@@ -69,6 +75,37 @@
       if (mr != null) { mr.shutdown(); }
     }
   }
+  
+  public void testOutOfBandHeartbeats() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      
+      int taskTrackers = 1;
+      JobConf jobConf = new JobConf();
+      jobConf.setFloat(JTConfig.JT_HEARTBEATS_SCALING_FACTOR, 30.0f);
+      jobConf.setBoolean(TTConfig.TT_OUTOFBAND_HEARBEAT, true);
+      mr = new MiniMRCluster(taskTrackers, 
+                             dfs.getFileSystem().getUri().toString(), 3, 
+                             null, null, jobConf);
+      long start = System.currentTimeMillis();
+      TestMiniMRDFSSort.runRandomWriter(mr.createJobConf(), new Path("rw"));
+      long end = System.currentTimeMillis();
+      
+      final int expectedRuntimeSecs = 120;
+      final int runTimeSecs = (int)((end-start) / 1000); 
+      System.err.println("Runtime is " + runTimeSecs);
+      assertEquals("Actual runtime " + runTimeSecs + "s not less than expected " +
+      		         "runtime of " + expectedRuntimeSecs + "s!", 
+                   true, (runTimeSecs <= 120));
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+      if (dfs != null) { dfs.shutdown(); }
+    }
+  }
+  
 }
 
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=819740&r1=819739&r2=819740&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Mon Sep 28 21:27:22 2009
@@ -71,7 +71,7 @@
     return setup;
   }
 
-  private static void runRandomWriter(JobConf job, Path sortInput) 
+  public static void runRandomWriter(JobConf job, Path sortInput) 
   throws Exception {
     // Scale down the default settings for RandomWriter for the test-case
     // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP



Mime
View raw message