hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r616474 - in /hadoop/core/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java
Date Tue, 29 Jan 2008 18:41:07 GMT
Author: ddas
Date: Tue Jan 29 10:41:05 2008
New Revision: 616474

URL: http://svn.apache.org/viewvc?rev=616474&view=rev
Log:
HADOOP-2639. Fixes a problem to do with incorrect maintenance of values for runningMapTasks/runningReduceTasks.
Contributed by Amar Kamat and Arun murthy.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=616474&r1=616473&r2=616474&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jan 29 10:41:05 2008
@@ -597,6 +597,10 @@
     HADOOP-2713. TestDatanodeDeath failed on windows because the replication
     request was timing out. (dhruba)
 
+    HADOOP-2639. Fixes a problem to do with incorrect maintenance of values 
+    for runningMapTasks/runningReduceTasks. (Amar Kamat and Arun Murthy 
+    via ddas)
+
 Release 0.15.3 - 2008-01-18
 
   BUG FIXES

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=616474&r1=616473&r2=616474&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jan 29 10:41:05
2008
@@ -57,6 +57,8 @@
   TaskInProgress reduces[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
+  
+  // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
   int runningReduceTasks = 0;
   int finishedMapTasks = 0;
@@ -563,16 +565,19 @@
       return null;
     }
     
-    boolean wasRunning = maps[target].isRunning();
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (!wasRunning) {
+    if (result != null) {
       runningMapTasks += 1;
-      JobHistory.Task.logStarted(profile.getJobId(), 
-                                 maps[target].getTIPId(), Values.MAP.name(),
-                                 System.currentTimeMillis());
-    }
 
-    jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_MAPS, 1);
+      boolean wasRunning = maps[target].isRunning();
+      if (!wasRunning) {
+        JobHistory.Task.logStarted(profile.getJobId(), 
+                                   maps[target].getTIPId(), Values.MAP.name(),
+                                   System.currentTimeMillis());
+      }
+
+      jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_MAPS, 1);
+    }
 
     return result;
   }    
@@ -596,16 +601,19 @@
       return null;
     }
     
-    boolean wasRunning = reduces[target].isRunning();
     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
-    if (!wasRunning) {
+    if (result != null) {
       runningReduceTasks += 1;
-      JobHistory.Task.logStarted(profile.getJobId(), 
-                                 reduces[target].getTIPId(), Values.REDUCE.name(),
-                                 System.currentTimeMillis());
+
+      boolean wasRunning = reduces[target].isRunning();
+      if (!wasRunning) {
+        JobHistory.Task.logStarted(profile.getJobId(), 
+                                   reduces[target].getTIPId(), Values.REDUCE.name(),
+                                   System.currentTimeMillis());
+      }
+
+      jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_REDUCES, 1);
     }
-    
-    jobCounters.incrCounter(Counter.TOTAL_LAUNCHED_REDUCES, 1);
 
     return result;
   }
@@ -788,6 +796,11 @@
     String taskid = status.getTaskId();
         
     // Sanity check: is the TIP already complete? 
+    // It _is_ safe to not decrement running{Map|Reduce}Tasks and
+    // finished{Map|Reduce}Tasks variables here because one and only
+    // one task-attempt of a TIP gets to completedTask. This is because
+    // the TaskCommitThread in the JobTracker marks other, completed, 
+    // speculative tasks as _complete_.
     if (tip.isComplete()) {
       // Mark this task as KILLED
       tip.alreadyCompletedTask(taskid);



Mime
View raw message