hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r588588 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/Counters.java src/java/org/apache/hadoop/mapred/JobHistory.java src/java/org/apache/hadoop/mapred/JobInProgress.java
Date Fri, 26 Oct 2007 09:47:21 GMT
Author: ddas
Date: Fri Oct 26 02:47:20 2007
New Revision: 588588

URL: http://svn.apache.org/viewvc?rev=588588&view=rev
Log:
HADOOP-1210.  Log counters in job history. Contributed by Owen O'Malley

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=588588&r1=588587&r2=588588&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Oct 26 02:47:20 2007
@@ -31,6 +31,8 @@
     HADOOP-1857.  Ability to run a script when a task fails to capture stack
     traces. (Amareshwari Sri Ramadasu via ddas)
 
+    HADOOP-1210.  Log counters in job history. (Owen O'Malley via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-1898.  Release the lock protecting the last time of the last stack

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java?rev=588588&r1=588587&r2=588588&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Counters.java Fri Oct 26 02:47:20
2007
@@ -392,5 +392,29 @@
     }
     return sb.toString();
   }
-  
+
+  /**
+   * Convert a counters object into a single line that is easy to parse.
+   * @return the string with "name=value" for each counter and separated by ","
+   */
+  public String makeCompactString() {
+    StringBuffer buffer = new StringBuffer();
+    for(String groupName: getGroupNames()){
+      Counters.Group group = getGroup(groupName);
+      boolean first = true;
+      for(String counterName: group.getCounterNames()) {
+        if (first) {
+          first = false;
+        } else {
+          buffer.append(',');
+        }
+        buffer.append(groupName);
+        buffer.append('.');
+        buffer.append(counterName);
+        buffer.append('=');
+        buffer.append(group.getCounter(counterName));
+      }
+    }
+    return buffer.toString();
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=588588&r1=588587&r2=588588&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Fri Oct 26 02:47:20
2007
@@ -72,17 +72,23 @@
    * Record types are identifiers for each line of log in history files. 
    * A record type appears as the first token in a single line of log. 
    */
-  public static enum RecordTypes {Jobtracker, Job, Task, MapAttempt, ReduceAttempt};
+  public static enum RecordTypes {
+    Jobtracker, Job, Task, MapAttempt, ReduceAttempt
+  }
+
   /**
    * Job history files contain key="value" pairs, where keys belong to this enum. 
    * It acts as a global namespace for all keys. 
    */
-  public static enum Keys { JOBTRACKERID,
-                            START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME,
LAUNCH_TIME, 
-                            TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, FINISHED_MAPS,
FINISHED_REDUCES,
-                            JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, ERROR, TASK_ATTEMPT_ID,
TASK_STATUS, 
-                            COPY_PHASE, SORT_PHASE, REDUCE_PHASE, SHUFFLE_FINISHED, SORT_FINISHED

-  };
+  public static enum Keys { 
+    JOBTRACKERID,
+    START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, 
+    LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
+    FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
+    ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
+    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS
+  }
+
   /**
    * This enum contains some of the values commonly used by history log events. 
    * since values in history can only be strings - Values.name() is used in 
@@ -90,7 +96,8 @@
    */
   public static enum Values {
     SUCCESS, FAILED, KILLED, MAP, REDUCE
-  };
+  }
+
   // temp buffer for parsed dataa
   private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>();

 
@@ -182,7 +189,8 @@
    * @param value value
    */
   
-  static void log(PrintWriter out, RecordTypes recordType, Enum key, String value){
+  static void log(PrintWriter out, RecordTypes recordType, Keys key, 
+                  String value){
     out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""); 
     out.flush();
   }
@@ -195,7 +203,8 @@
    * @param values type of log event
    */
 
-  static void log(PrintWriter out, RecordTypes recordType, Enum[] keys, String[] values){
+  static void log(PrintWriter out, RecordTypes recordType, Keys[] keys, 
+                  String[] values){
     StringBuffer buf = new StringBuffer(recordType.name()); 
     buf.append(DELIMITER); 
     for(int i =0; i< keys.length; i++){
@@ -341,7 +350,7 @@
       if (!disableHistory){
         synchronized(MASTER_INDEX_LOG_FILE){
           JobHistory.log(masterIndex, RecordTypes.Job, 
-                         new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME,
Keys.JOBCONF }, 
+                         new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME,
Keys.JOBCONF }, 
                          new String[]{jobId, jobName, user, 
                                       String.valueOf(submitTime), jobConfPath}
                         );
@@ -355,7 +364,7 @@
           openJobs.put(logFileName, writer);
           // add to writer as well 
           JobHistory.log(writer, RecordTypes.Job, 
-                         new Enum[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME,
Keys.JOBCONF }, 
+                         new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME,
Keys.JOBCONF }, 
                          new String[]{jobId, jobName, user, 
                                       String.valueOf(submitTime) , jobConfPath}
                         ); 
@@ -400,7 +409,7 @@
       if (!disableHistory){
         synchronized(MASTER_INDEX_LOG_FILE){
           JobHistory.log(masterIndex, RecordTypes.Job, 
-                         new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES
},
+                         new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES
},
                          new String[] {jobId,  String.valueOf(startTime), 
                                        String.valueOf(totalMaps), String.valueOf(totalReduces)
}); 
         }
@@ -410,7 +419,7 @@
         
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
-                         new Enum[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES
},
+                         new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES
},
                          new String[] {jobId,  String.valueOf(startTime), String.valueOf(totalMaps),
String.valueOf(totalReduces)}); 
         }
       }
@@ -423,15 +432,22 @@
      * @param finishedReduces no of reduces finished sucessfully. 
      * @param failedMaps no of failed map tasks. 
      * @param failedReduces no of failed reduce tasks. 
+     * @param counters the counters from the job
      */ 
-    public static void logFinished(String jobId, long finishTime, int finishedMaps, int finishedReduces,
-                                   int failedMaps, int failedReduces){
+    public static void logFinished(String jobId, long finishTime, 
+                                   int finishedMaps, int finishedReduces,
+                                   int failedMaps, int failedReduces,
+                                   Counters counters){
       if (!disableHistory){
         synchronized(MASTER_INDEX_LOG_FILE){
           JobHistory.log(masterIndex, RecordTypes.Job,          
-                         new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES },
-                         new String[] {jobId,  "" + finishTime, Values.SUCCESS.name(), 
-                                       String.valueOf(finishedMaps), String.valueOf(finishedReduces)
});
+                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 
+                                     Keys.JOB_STATUS, Keys.FINISHED_MAPS, 
+                                     Keys.FINISHED_REDUCES},
+                         new String[] {jobId,  "" + finishTime, 
+                                       Values.SUCCESS.name(), 
+                                       String.valueOf(finishedMaps), 
+                                       String.valueOf(finishedReduces)});
         }
         
         // close job file for this job
@@ -439,11 +455,18 @@
         PrintWriter writer = openJobs.get(logFileName); 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,          
-                         new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES,
-                                     Keys.FAILED_MAPS, Keys.FAILED_REDUCES},
-                         new String[] {jobId,  "" + finishTime, Values.SUCCESS.name(), 
-                                       String.valueOf(finishedMaps), String.valueOf(finishedReduces),
-                                       String.valueOf(failedMaps), String.valueOf(failedReduces)});
+                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 
+                                     Keys.JOB_STATUS, Keys.FINISHED_MAPS, 
+                                     Keys.FINISHED_REDUCES,
+                                     Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
+                                     Keys.COUNTERS},
+                         new String[] {jobId,  Long.toString(finishTime), 
+                                       Values.SUCCESS.name(), 
+                                       String.valueOf(finishedMaps), 
+                                       String.valueOf(finishedReduces),
+                                       String.valueOf(failedMaps), 
+                                       String.valueOf(failedReduces),
+                                       counters.makeCompactString()});
           writer.close();
           openJobs.remove(logFileName); 
         }
@@ -462,7 +485,7 @@
       if (!disableHistory){
         synchronized(MASTER_INDEX_LOG_FILE){
           JobHistory.log(masterIndex, RecordTypes.Job,
-                         new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES },
+                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES },
                          new String[] {jobid,  String.valueOf(timestamp), Values.FAILED.name(),
String.valueOf(finishedMaps), 
                                        String.valueOf(finishedReduces)}); 
         }
@@ -470,7 +493,7 @@
         PrintWriter writer = openJobs.get(logFileName); 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
-                         new Enum[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES },
+                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
Keys.FINISHED_REDUCES },
                          new String[] {jobid,  String.valueOf(timestamp), Values.FAILED.name(),
String.valueOf(finishedMaps), 
                                        String.valueOf(finishedReduces)}); 
           writer.close();
@@ -499,7 +522,8 @@
       if (!disableHistory){
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); 
         if (null != writer){
-          JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE
, Keys.START_TIME}, 
+          JobHistory.log(writer, RecordTypes.Task, 
+                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME}, 
                          new String[]{taskId, taskType, String.valueOf(startTime)});
         }
       }
@@ -512,13 +536,17 @@
      * @param finishTime finish timeof task in ms
      */
     public static void logFinished(String jobId, String taskId, String taskType, 
-                                   long finishTime){
+                                   long finishTime, Counters counters){
       if (!disableHistory){
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); 
         if (null != writer){
-          JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,

-                                                              Keys.TASK_STATUS, Keys.FINISH_TIME},

-                         new String[]{ taskId, taskType, Values.SUCCESS.name(), String.valueOf(finishTime)});
+          JobHistory.log(writer, RecordTypes.Task, 
+                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
+                                    Keys.TASK_STATUS, Keys.FINISH_TIME,
+                                    Keys.COUNTERS}, 
+                         new String[]{ taskId, taskType, Values.SUCCESS.name(), 
+                                       String.valueOf(finishTime),
+                                       counters.makeCompactString()});
         }
       }
     }
@@ -534,8 +562,9 @@
       if (!disableHistory){
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); 
         if (null != writer){
-          JobHistory.log(writer, RecordTypes.Task, new Enum[]{Keys.TASKID, Keys.TASK_TYPE,

-                                                              Keys.TASK_STATUS, Keys.FINISH_TIME,
Keys.ERROR}, 
+          JobHistory.log(writer, RecordTypes.Task, 
+                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
+                                    Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR}, 
                          new String[]{ taskId,  taskType, Values.FAILED.name(), String.valueOf(time)
, error});
         }
       }
@@ -571,8 +600,9 @@
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, 
-                                     Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
+                         new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
+                                     Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
+                                     Keys.HOSTNAME},
                          new String[]{Values.MAP.name(),  taskId, 
                                       taskAttemptId, String.valueOf(startTime), hostName});

         }
@@ -586,18 +616,22 @@
      * @param finishTime finish time
      * @param hostName host name 
      */
-    public static void logFinished(String jobId, String taskId, String taskAttemptId, long
finishTime, String hostName){
+    public static void logFinished(String jobId, String taskId, 
+                                   String taskAttemptId, long finishTime, 
+                                   String hostName){
       if (!disableHistory){
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,

+                         new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
+                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME},
                          new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(),
 
                                       String.valueOf(finishTime), hostName}); 
         }
       }
     }
+
     /**
      * Log task attempt failed event.  
      * @param jobId jobid
@@ -613,7 +647,7 @@
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,

+                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,

                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
                          new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(),
                                        String.valueOf(timestamp), hostName, error}); 
@@ -635,7 +669,7 @@
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Enum[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,

+                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,

                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
                          new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.KILLED.name(),
                                        String.valueOf(timestamp), hostName, error}); 
@@ -662,7 +696,7 @@
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Enum[]{  Keys.TASK_TYPE, Keys.TASKID, 
+                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
                          new String[]{Values.REDUCE.name(),  taskId, 
                                       taskAttemptId, String.valueOf(startTime), hostName});

@@ -679,14 +713,18 @@
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      */
-    public static void logFinished(String jobId, String taskId, String taskAttemptId, 
-                                   long shuffleFinished, long sortFinished, long finishTime,
String hostName){
+    public static void logFinished(String jobId, String taskId, 
+                                   String taskAttemptId, long shuffleFinished, 
+                                   long sortFinished, long finishTime, 
+                                   String hostName){
       if (!disableHistory){
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Enum[]{ Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS,

-                                     Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED, Keys.FINISH_TIME,
Keys.HOSTNAME},
+                         new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
+                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                     Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
+                                     Keys.FINISH_TIME, Keys.HOSTNAME},
                          new String[]{Values.REDUCE.name(),  taskId, taskAttemptId, Values.SUCCESS.name(),

                                       String.valueOf(shuffleFinished), String.valueOf(sortFinished),
                                       String.valueOf(finishTime), hostName}); 
@@ -708,7 +746,7 @@
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Enum[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID,
Keys.TASK_STATUS, 
+                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID,
Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
                          new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(),

                                        String.valueOf(timestamp), hostName, error }); 
@@ -730,8 +768,10 @@
         PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Enum[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID,
Keys.TASK_STATUS, 
-                                      Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
+                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
+                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
+                                      Keys.ERROR },
                          new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.KILLED.name(),

                                        String.valueOf(timestamp), hostName, error }); 
         }
@@ -739,6 +779,7 @@
     }
 
   }
+
   /**
    * Callback interface for reading back log events from JobHistory. This interface 
    * should be implemented and passed to JobHistory.parseHistory() 
@@ -793,16 +834,21 @@
           // find job that started more than one month back and remove them
           // for jobtracker instances which dont have a job in past one month 
           // remove the jobtracker start timestamp as well.
-          for (Map<String, JobHistory.JobInfo> jobs : 
-                  jobTrackersToJobs.values()) {
-            for(Iterator iter = jobs.keySet().iterator(); iter.hasNext(); iter.next()){
-              JobHistory.JobInfo job = jobs.get(iter.next());
-              if (now - job.getLong(Keys.SUBMIT_TIME) > THIRTY_DAYS_IN_MS) {
-                iter.remove(); 
-              }
-              if (jobs.size() == 0){
-                iter.remove(); 
+          Iterator<Map<String, JobHistory.JobInfo>> jobTrackerItr =
+            jobTrackersToJobs.values().iterator();
+          while (jobTrackerItr.hasNext()) {
+            Map<String, JobHistory.JobInfo> jobs = jobTrackerItr.next();
+            Iterator<Map.Entry<String, JobHistory.JobInfo>> jobItr = 
+                   jobs.entrySet().iterator();
+            while (jobItr.hasNext()) {
+              Map.Entry<String, JobHistory.JobInfo> item = jobItr.next();
+              if (now - item.getValue().getLong(Keys.SUBMIT_TIME) > 
+                  THIRTY_DAYS_IN_MS) {
+                jobItr.remove(); 
               }
+            }
+            if (jobs.size() == 0){
+              jobTrackerItr.remove(); 
             }
           }
           masterIndex.close(); 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=588588&r1=588587&r2=588588&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Oct 26 02:47:20
2007
@@ -254,7 +254,8 @@
       JobHistory.JobInfo.logStarted(profile.getJobId(), 
                                     System.currentTimeMillis(), 0, 0);
       JobHistory.JobInfo.logFinished(profile.getJobId(), 
-                                     System.currentTimeMillis(), 0, 0, 0, 0);
+                                     System.currentTimeMillis(), 0, 0, 0, 0,
+                                     getCounters());
       // Special case because the Job is not queued
       JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
 
@@ -800,7 +801,8 @@
                                         tip.getTIPId(), status.getTaskId(), status.getFinishTime(),

                                         taskTrackerName); 
       JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
-                                  Values.MAP.name(), status.getFinishTime()); 
+                                  Values.MAP.name(), status.getFinishTime(),
+                                  status.getCounters()); 
     }else{
       JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
                                           tip.getTIPId(), status.getTaskId(), status.getStartTime(),

@@ -810,7 +812,8 @@
                                            status.getSortFinishTime(), status.getFinishTime(),

                                            taskTrackerName); 
       JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
-                                  Values.REDUCE.name(), status.getFinishTime()); 
+                                  Values.REDUCE.name(), status.getFinishTime(),
+                                  status.getCounters()); 
     }
         
     // Update the running/finished map/reduce counts
@@ -878,7 +881,9 @@
       LOG.info("Job " + this.status.getJobId() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, 
-                                     this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks,
failedReduceTasks);
+                                     this.finishedMapTasks, 
+                                     this.finishedReduceTasks, failedMapTasks, 
+                                     failedReduceTasks, getCounters());
       metrics.completeJob();
       return true;
     }



Mime
View raw message