hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1194857 - in /hadoop/common/branches/branch-0.20-security-205/src/mapred: mapred-default.xml org/apache/hadoop/mapred/TaskTracker.java
Date Sat, 29 Oct 2011 09:45:15 GMT
Author: acmurthy
Date: Sat Oct 29 09:45:15 2011
New Revision: 1194857

URL: http://svn.apache.org/viewvc?rev=1194857&view=rev
Log:
Merge -c 1194854 from branch-0.20-security to branch-0.20-security-205 to fix MAPREDUCE-2355.

Modified:
    hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml?rev=1194857&r1=1194856&r2=1194857&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/mapred-default.xml Sat Oct
29 09:45:15 2011
@@ -242,6 +242,24 @@
 </property>
 
 <property>
+  <name>mapreduce.tasktracker.outofband.heartbeat.damper</name>
+  <value>1000000</value>
+  <description>When out-of-band heartbeats are enabled, provides
+  damping to avoid overwhelming the JobTracker if too many out-of-band
+  heartbeats would occur. The damping is calculated such that the
+  heartbeat interval is divided by (T*D + 1) where T is the number
+  of completed tasks and D is the damper value.
+  
+  Setting this to a high value like the default provides no damping --
+  as soon as any task finishes, a heartbeat will be sent. Setting this
+  parameter to 0 is equivalent to disabling the out-of-band heartbeat feature.
+  A value of 1 would indicate that, after one task has completed, the
+  time to wait before the next heartbeat would be 1/2 the usual time.
+  After two tasks have finished, it would be 1/3 the usual time, etc.
+  </description>
+</property>
+
+<property>
   <name>mapred.jobtracker.restart.recover</name>
   <value>false</value>
   <description>"true" to enable (job) recovery upon restart,

Modified: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1194857&r1=1194856&r2=1194857&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Sat Oct 29 09:45:15 2011
@@ -44,6 +44,7 @@ import java.util.Vector;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.crypto.SecretKey;
@@ -332,9 +333,13 @@ public class TaskTracker implements MRCo
   static final String TT_OUTOFBAND_HEARBEAT =
     "mapreduce.tasktracker.outofband.heartbeat";
   private volatile boolean oobHeartbeatOnTaskCompletion;
+  static final String TT_OUTOFBAND_HEARTBEAT_DAMPER = 
+    "mapreduce.tasktracker.outofband.heartbeat.damper";
+  static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
+  private volatile int oobHeartbeatDamper;
   
   // Track number of completed tasks to send an out-of-band heartbeat
-  private IntWritable finishedCount = new IntWritable(0);
+  private AtomicInteger finishedCount = new AtomicInteger(0);
   
   private MapEventsFetcherThread mapEventsFetcher;
   final int workerThreads;
@@ -833,6 +838,9 @@ public class TaskTracker implements MRCo
     
     oobHeartbeatOnTaskCompletion = 
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+    oobHeartbeatDamper = 
+      fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
+          DEFAULT_OOB_HEARTBEAT_DAMPER);
   }
 
   private void createInstrumentation() {
@@ -1520,25 +1528,39 @@ public class TaskTracker implements MRCo
     return recentMapEvents;
   }
 
+  private long getHeartbeatInterval(int numFinishedTasks) {
+    return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
+  }
+  
   /**
    * Main service loop.  Will stay in this loop forever.
    */
   State offerService() throws Exception {
-    long lastHeartbeat = 0;
+    long lastHeartbeat = System.currentTimeMillis();
 
     while (running && !shuttingDown) {
       try {
         long now = System.currentTimeMillis();
-
-        long waitTime = heartbeatInterval - (now - lastHeartbeat);
-        if (waitTime > 0) {
+        
+        // accelerate to account for multiple finished tasks up-front
+        long remaining = 
+          (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+        while (remaining > 0) {
           // sleeps for the wait time or 
-          // until there are empty slots to schedule tasks
+          // until there are *enough* empty slots to schedule tasks
           synchronized (finishedCount) {
-            if (finishedCount.get() == 0) {
-              finishedCount.wait(waitTime);
+            finishedCount.wait(remaining);
+            
+            // Recompute
+            now = System.currentTimeMillis();
+            remaining = 
+              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+            
+            if (remaining <= 0) {
+              // Reset count 
+              finishedCount.set(0);
+              break;
             }
-            finishedCount.set(0);
           }
         }
 
@@ -2407,8 +2429,7 @@ public class TaskTracker implements MRCo
   private void notifyTTAboutTaskCompletion() {
     if (oobHeartbeatOnTaskCompletion) {
       synchronized (finishedCount) {
-        int value = finishedCount.get();
-        finishedCount.set(value+1);
+        finishedCount.incrementAndGet();
         finishedCount.notify();
       }
     }



Mime
View raw message