hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r478358 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred: JobInProgress.java TaskInProgress.java TaskTracker.java
Date Wed, 22 Nov 2006 22:49:30 GMT
Author: cutting
Date: Wed Nov 22 14:49:29 2006
New Revision: 478358

URL: http://svn.apache.org/viewvc?view=rev&rev=478358
Log:
HADOOP-741.  Fix some issues with speculative execution.  Contributed by Sanjay.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=478358&r1=478357&r2=478358
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Nov 22 14:49:29
2006
@@ -328,8 +328,7 @@
         return null;
       }
       ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
-      double avgProgress = status.mapProgress() / maps.length;
-      int target = findNewTask(tts, clusterSize, avgProgress, 
+      int target = findNewTask(tts, clusterSize, status.mapProgress(), 
                                   maps, mapCache);
       if (target == -1) {
         return null;
@@ -357,8 +356,7 @@
             return null;
         }
 
-        double avgProgress = status.reduceProgress() ;
-        int target = findNewTask(tts, clusterSize, avgProgress, 
+        int target = findNewTask(tts, clusterSize, status.reduceProgress() , 
                                     reduces, null);
         if (target == -1) {
           return null;
@@ -441,7 +439,6 @@
                          task.hasSpeculativeTask(avgProgress) && 
                          ! task.hasRunOnMachine(taskTracker)) {
                 specTarget = i;
-                break ;
               }
             }
           }
@@ -696,8 +693,8 @@
         
         // Delete temp dfs dirs created if any, like in case of 
         // speculative exn of reduces.  
-     //   String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; 
-     //   fs.delete(new Path(tempDir)); 
+        String tempDir = conf.get("mapred.system.dir") + "/job_" + uniqueString; 
+        fs.delete(new Path(tempDir)); 
 
       } catch (IOException e) {
         LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=478358&r1=478357&r2=478358
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Nov 22 14:49:29
2006
@@ -42,7 +42,6 @@
     static final int MAX_TASK_FAILURES = 4;    
     static final double SPECULATIVE_GAP = 0.2;
     static final long SPECULATIVE_LAG = 60 * 1000;
-    static final int MAX_CONCURRENT_TASKS = 2; 
     private static NumberFormat idFormat = NumberFormat.getInstance();
     static {
       idFormat.setMinimumIntegerDigits(6);
@@ -445,21 +444,14 @@
         // REMIND - mjc - these constants should be examined
         // in more depth eventually...
         //
-        if (isMapTask() &&
-            activeTasks.size() <= MAX_TASK_EXECS &&
+      
+      if( activeTasks.size() <= MAX_TASK_EXECS &&
             runSpeculative &&
             (averageProgress - progress >= SPECULATIVE_GAP) &&
-            (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
-            return true;
-        }else{
-          //Note: validate criteria for speculative reduce execution
-          if( runSpeculative && (activeTasks.size() < MAX_CONCURRENT_TASKS ) &&

-              (averageProgress - progress >= SPECULATIVE_GAP) &&
-              completes <= 0 &&
-              (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
-            return true ; 
-          }
-        }
+            (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
+            && completes == 0) {
+          return true;
+      }
         return false;
     }
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=478358&r1=478357&r2=478358
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Nov 22 14:49:29
2006
@@ -1026,6 +1026,17 @@
             }
             if (keepJobFiles)
               return;
+            
+            // Delete temp directory in case any task used PhasedFileSystem.
+            try{
+              String systemDir = task.getConf().get("mapred.system.dir");
+              String taskTempDir = systemDir + "/" + 
+                  task.getJobId() + "/" + task.getTipId();
+              fs.delete(new Path(taskTempDir)) ;
+            }catch(IOException e){
+              LOG.warn("Error in deleting reduce temporary output",e); 
+            }
+            
             // delete the job diretory for this task 
             // since the job is done/failed
             this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
@@ -1051,26 +1062,6 @@
                 runstate = TaskStatus.State.FAILED;
               } else {
                 runstate = TaskStatus.State.KILLED;
-              }
-            }
-            
-            // the temporary file names in speculative exn are generated in 
-            // the launched JVM, and we dont talk to it when killing so cleanup
-            // should happen here. find the task id and delete the temp directory 
-            // for the task. only for killed speculative reduce instances
-            
-            // Note: it would be better to couple this with delete localfiles
-            // which is in conf currently, it doenst belong there. 
-
-            if( !task.isMapTask() && 
-                this.defaultJobConf.getSpeculativeExecution() ){
-              try{
-                String systemDir = task.getConf().get("mapred.system.dir");
-                String taskTempDir = systemDir + "/" + 
-                    task.getJobId() + "/" + task.getTipId();
-                fs.delete(new Path(taskTempDir)) ;
-              }catch(IOException e){
-                LOG.warn("Error in deleting reduce temporary output",e); 
               }
             }
         }



Mime
View raw message