hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077531 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/JobHistory.java mapred/org/apache/hadoop/mapred/JobTracker.java test/org/apache/hadoop/mapred/TestJobHistoryConfig.java
Date Fri, 04 Mar 2011 04:24:58 GMT
Author: omalley
Date: Fri Mar  4 04:24:57 2011
New Revision: 1077531

URL: http://svn.apache.org/viewvc?rev=1077531&view=rev
Log:
commit f5a5744b89b73591fcdaec839ab1d7e7ea2a3ccf
Author: Arun C Murthy <acmurthy@apache.org>
Date:   Tue Jul 6 10:43:43 2010 -0700

    MAPREDUCE-1699. Ensure JobHistory isn't disabled for any reason. Contributed by Krishna Ramachandran.
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1699. Ensure JobHistory isn't disabled for any reason. (Krishna
    +    Ramachandran via acmurthy)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1077531&r1=1077530&r2=1077531&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar  4 04:24:57 2011
@@ -102,7 +102,6 @@ public class JobHistory {
   public static final int JOB_NAME_TRIM_LENGTH = 50;
   private static String JOBTRACKER_UNIQUE_STRING = null;
   private static String LOG_DIR = null;
-  private static boolean disableHistory = true; 
   private static final String SECONDARY_FILE_SUFFIX = ".recover";
   private static long jobHistoryBlockSize = 0;
   private static String jobtrackerHostname;
@@ -227,9 +226,6 @@ public class JobHistory {
     }
 
     void moveToDone(final JobID id) {
-      if (disableHistory) {
-        return;
-      }
       final List<Path> paths = new ArrayList<Path>();
       final Path historyFile = fileManager.getHistoryFile(id);
       if (historyFile == null) {
@@ -279,6 +275,10 @@ public class JobHistory {
 
       });
     }
+
+    void removeWriter(JobID jobId, PrintWriter writer) {
+      fileManager.getWriters(jobId).remove(writer);
+    }
   }
   /**
    * Record types are identifiers for each line of log in history files. 
@@ -320,72 +320,58 @@ public class JobHistory {
    * @return true if intialized properly
    *         false otherwise
    */
-  public static boolean init(JobTracker jobTracker, JobConf conf,
-             String hostname, long jobTrackerStartTime){
-    try {
-      LOG_DIR = conf.get("hadoop.job.history.location" ,
-        "file:///" + new File(
-        System.getProperty("hadoop.log.dir")).getAbsolutePath()
-        + File.separator + "history");
-      JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
-                                    String.valueOf(jobTrackerStartTime) + "_";
-      jobtrackerHostname = hostname;
-      Path logDir = new Path(LOG_DIR);
-      LOGDIR_FS = logDir.getFileSystem(conf);
-      if (!LOGDIR_FS.exists(logDir)){
-        if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
-          throw new IOException("Mkdirs failed to create " + logDir.toString());
-        }
-      }
-      conf.set("hadoop.job.history.location", LOG_DIR);
-      disableHistory = false;
-      // set the job history block size (default is 3MB)
-      jobHistoryBlockSize = 
-        conf.getLong("mapred.jobtracker.job.history.block.size", 
-                     3 * 1024 * 1024);
-      jtConf = conf;
-
-      // queue and job level security is enabled on the mapreduce cluster or not
-      aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
-
-      // initialize the file manager
-      fileManager = new JobHistoryFilesManager(conf, jobTracker);
-    } catch(IOException e) {
-        LOG.error("Failed to initialize JobHistory log file", e); 
-        disableHistory = true;
-    }
-    return !(disableHistory);
-  }
-
-  static boolean initDone(JobConf conf, FileSystem fs){
-    try {
-      //if completed job history location is set, use that
-      String doneLocation = conf.
-                       get("mapred.job.tracker.history.completed.location");
-      if (doneLocation != null) {
-        DONE = fs.makeQualified(new Path(doneLocation));
-        DONEDIR_FS = fs;
-      } else {
-        DONE = new Path(LOG_DIR, "done");
-        DONEDIR_FS = LOGDIR_FS;
+  public static void init(JobTracker jobTracker, JobConf conf,
+             String hostname, long jobTrackerStartTime) throws IOException {
+    LOG_DIR = conf.get("hadoop.job.history.location" ,
+      "file:///" + new File(
+      System.getProperty("hadoop.log.dir")).getAbsolutePath()
+      + File.separator + "history");
+    JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
+                                  String.valueOf(jobTrackerStartTime) + "_";
+    jobtrackerHostname = hostname;
+    Path logDir = new Path(LOG_DIR);
+    LOGDIR_FS = logDir.getFileSystem(conf);
+    if (!LOGDIR_FS.exists(logDir)){
+      if (!LOGDIR_FS.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
+        throw new IOException("Mkdirs failed to create " + logDir.toString());
+      }
+    }
+    conf.set("hadoop.job.history.location", LOG_DIR);
+    // set the job history block size (default is 3MB)
+    jobHistoryBlockSize = 
+      conf.getLong("mapred.jobtracker.job.history.block.size", 
+                   3 * 1024 * 1024);
+    jtConf = conf;
+
+    // queue and job level security is enabled on the mapreduce cluster or not
+    aclsEnabled = conf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
+
+    // initialize the file manager
+    fileManager = new JobHistoryFilesManager(conf, jobTracker);
+  }
+
+  static void initDone(JobConf conf, FileSystem fs) throws IOException {
+    //if completed job history location is set, use that
+    String doneLocation = conf.
+                     get("mapred.job.tracker.history.completed.location");
+    if (doneLocation != null) {
+      DONE = fs.makeQualified(new Path(doneLocation));
+      DONEDIR_FS = fs;
+    } else {
+      DONE = new Path(LOG_DIR, "done");
+      DONEDIR_FS = LOGDIR_FS;
+    }
+
+    //If not already present create the done folder with appropriate 
+    //permission
+    if (!DONEDIR_FS.exists(DONE)) {
+      LOG.info("Creating DONE folder at "+ DONE);
+      if (! DONEDIR_FS.mkdirs(DONE, 
+          new FsPermission(HISTORY_DIR_PERMISSION))) {
+        throw new IOException("Mkdirs failed to create " + DONE.toString());
       }
-
-      //If not already present create the done folder with appropriate 
-      //permission
-      if (!DONEDIR_FS.exists(DONE)) {
-        LOG.info("Creating DONE folder at "+ DONE);
-        if (! DONEDIR_FS.mkdirs(DONE, 
-            new FsPermission(HISTORY_DIR_PERMISSION))) {
-          throw new IOException("Mkdirs failed to create " + DONE.toString());
-        }
-      }
-
-      fileManager.start();
-    } catch(IOException e) {
-        LOG.error("Failed to initialize JobHistory log file", e); 
-        disableHistory = true;
     }
-    return !(disableHistory);
+    fileManager.start();
   }
 
 
@@ -436,12 +422,10 @@ public class JobHistory {
      * @param jobId job id, assigned by jobtracker. 
      */
     static void logMetaInfo(ArrayList<PrintWriter> writers){
-      if (!disableHistory){
-        if (null != writers){
-          JobHistory.log(writers, RecordTypes.Meta, 
-              new Keys[] {Keys.VERSION},
-              new String[] {String.valueOf(VERSION)}); 
-        }
+      if (null != writers){
+         JobHistory.log(writers, RecordTypes.Meta, 
+             new Keys[] {Keys.VERSION},
+             new String[] {String.valueOf(VERSION)}); 
       }
     }
   }
@@ -557,8 +541,30 @@ public class JobHistory {
    * @param values type of log event
    */
 
+  /**
+   * Log a number of keys and values with record. the array length of keys and values
+   * should be same. 
+   * @param recordType type of log event
+   * @param keys type of log event
+   * @param values type of log event
+   */
+
   static void log(ArrayList<PrintWriter> writers, RecordTypes recordType, 
                   Keys[] keys, String[] values) {
+    log(writers, recordType, keys, values, null);
+  }
+  
+  /**
+   * Log a number of keys and values with record. the array length of keys and values
+   * should be same. 
+   * @param recordType type of log event
+   * @param keys type of log event
+   * @param values type of log event
+   * @param JobID jobid of the job  
+   */
+
+  static void log(ArrayList<PrintWriter> writers, RecordTypes recordType, 
+                  Keys[] keys, String[] values, JobID id) {
 
     // First up calculate the length of buffer, so that we are performant
     // enough.
@@ -583,28 +589,14 @@ public class JobHistory {
     
     for (PrintWriter out : writers) {
       out.println(builder.toString());
+      if (out.checkError() && id != null) {
+        LOG.info("Logging failed for job " + id + "removing PrintWriter from FileManager");
+        fileManager.removeWriter(id, out);
+      }
     }
   }
   
   /**
-   * Returns history disable status. by default history is enabled so this
-   * method returns false. 
-   * @return true if history logging is disabled, false otherwise. 
-   */
-  public static boolean isDisableHistory() {
-    return disableHistory;
-  }
-
-  /**
-   * Enable/disable history logging. Default value is false, so history 
-   * is enabled by default. 
-   * @param disableHistory true if history should be disabled, false otherwise. 
-   */
-  public static void setDisableHistory(boolean disableHistory) {
-    JobHistory.disableHistory = disableHistory;
-  }
-  
-  /**
    * Get the history location
    */
   static Path getJobHistoryLocation() {
@@ -1185,104 +1177,101 @@ public class JobHistory {
       String userLogDir = null;
       String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
 
-      if (!disableHistory){
-        // Get the username and job name to be used in the actual log filename;
-        // sanity check them too        
-        String jobName = getJobName(jobConf);
-
-        String user = getUserName(jobConf);
-        
-        // get the history filename
-        String logFileName = null;
-        if (restarted) {
-          logFileName = getJobHistoryFileName(jobConf, jobId);
-          if (logFileName == null) {
-            logFileName =
-              encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
-          } else {
-            String parts[] = logFileName.split("_");
-            //TODO this is a hack :(
-            // jobtracker-hostname_jobtracker-identifier_
-            String jtUniqueString = parts[0] + "_" + parts[1] + "_";
-            jobUniqueString = jtUniqueString + jobId.toString();
-          }
-        } else {
-          logFileName = 
+      // Get the username and job name to be used in the actual log filename;
+      // sanity check them too        
+      String jobName = getJobName(jobConf);
+      String user = getUserName(jobConf);
+      
+      // get the history filename
+      String logFileName = null;
+      if (restarted) {
+        logFileName = getJobHistoryFileName(jobConf, jobId);
+        if (logFileName == null) {
+          logFileName =
             encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+        } else {
+          String parts[] = logFileName.split("_");
+          //TODO this is a hack :(
+          // jobtracker-hostname_jobtracker-identifier_
+          String jtUniqueString = parts[0] + "_" + parts[1] + "_";
+          jobUniqueString = jtUniqueString + jobId.toString();
         }
+      } else {
+        logFileName = 
+          encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+      }
 
-        // setup the history log file for this job
-        Path logFile = getJobHistoryLogLocation(logFileName);
-        
-        // find user log directory
-        Path userLogFile = 
-          getJobHistoryLogLocationForUser(logFileName, jobConf);
-
-        try{
-          FSDataOutputStream out = null;
-          PrintWriter writer = null;
-
-          if (LOG_DIR != null) {
-            // create output stream for logging in hadoop.job.history.location
-            if (restarted) {
-              logFile = recoverJobHistoryFile(jobConf, logFile);
-              logFileName = logFile.getName();
-            }
-            
-            int defaultBufferSize = 
-              LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
-            out = LOGDIR_FS.create(logFile, 
-                            new FsPermission(HISTORY_FILE_PERMISSION),
-                            true, 
-                            defaultBufferSize, 
-                            LOGDIR_FS.getDefaultReplication(), 
-                            jobHistoryBlockSize, null);
-            writer = new PrintWriter(out);
-            fileManager.addWriter(jobId, writer);
-
-            // cache it ...
-            fileManager.setHistoryFile(jobId, logFile);
-          }
-          if (userLogFile != null) {
-            // Get the actual filename as recoverJobHistoryFile() might return
-            // a different filename
-            userLogDir = userLogFile.getParent().toString();
-            userLogFile = new Path(userLogDir, logFileName);
-            
-            // create output stream for logging 
-            // in hadoop.job.history.user.location
-            fs = userLogFile.getFileSystem(jobConf);
- 
-            out = fs.create(userLogFile, true, 4096);
-            writer = new PrintWriter(out);
-            fileManager.addWriter(jobId, writer);
+      // setup the history log file for this job
+      Path logFile = getJobHistoryLogLocation(logFileName);
+      
+      // find user log directory
+      Path userLogFile = 
+        getJobHistoryLogLocationForUser(logFileName, jobConf);
+      PrintWriter writer = null;
+      try{
+        FSDataOutputStream out = null;
+        if (LOG_DIR != null) {
+          // create output stream for logging in hadoop.job.history.location
+          if (restarted) {
+            logFile = recoverJobHistoryFile(jobConf, logFile);
+            logFileName = logFile.getName();
           }
           
-          ArrayList<PrintWriter> writers = fileManager.getWriters(jobId);
-          // Log the history meta info
-          JobHistory.MetaInfoManager.logMetaInfo(writers);
-
-          String viewJobACL = "*";
-          String modifyJobACL = "*";
-          if (aclsEnabled) {
-            viewJobACL = jobConf.get(JobACL.VIEW_JOB.getAclName(), " ");
-            modifyJobACL = jobConf.get(JobACL.MODIFY_JOB.getAclName(), " ");
-          }
-          //add to writer as well 
-          JobHistory.log(writers, RecordTypes.Job, 
-                         new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER,
-                                    Keys.SUBMIT_TIME, Keys.JOBCONF,
-                                    Keys.VIEW_JOB, Keys.MODIFY_JOB,
-                                    Keys.JOB_QUEUE}, 
-                         new String[]{jobId.toString(), jobName, user, 
-                                      String.valueOf(submitTime) , jobConfPath,
-                                      viewJobACL, modifyJobACL,
-                                      jobConf.getQueueName()}
-                        ); 
-             
-        }catch(IOException e){
-          LOG.error("Failed creating job history log file, disabling history", e);
-          disableHistory = true; 
+          int defaultBufferSize = 
+            LOGDIR_FS.getConf().getInt("io.file.buffer.size", 4096);
+          out = LOGDIR_FS.create(logFile, 
+                          new FsPermission(HISTORY_FILE_PERMISSION),
+                          true, 
+                          defaultBufferSize, 
+                          LOGDIR_FS.getDefaultReplication(), 
+                          jobHistoryBlockSize, null);
+          writer = new PrintWriter(out);
+          fileManager.addWriter(jobId, writer);
+
+          // cache it ...
+          fileManager.setHistoryFile(jobId, logFile);
+        }
+        if (userLogFile != null) {
+          // Get the actual filename as recoverJobHistoryFile() might return
+          // a different filename
+          userLogDir = userLogFile.getParent().toString();
+          userLogFile = new Path(userLogDir, logFileName);
+          
+          // create output stream for logging 
+          // in hadoop.job.history.user.location
+          fs = userLogFile.getFileSystem(jobConf);
+ 
+          out = fs.create(userLogFile, true, 4096);
+          writer = new PrintWriter(out);
+          fileManager.addWriter(jobId, writer);
+        }
+        
+        ArrayList<PrintWriter> writers = fileManager.getWriters(jobId);
+        // Log the history meta info
+        JobHistory.MetaInfoManager.logMetaInfo(writers);
+
+        String viewJobACL = "*";
+        String modifyJobACL = "*";
+        if (aclsEnabled) {
+          viewJobACL = jobConf.get(JobACL.VIEW_JOB.getAclName(), " ");
+          modifyJobACL = jobConf.get(JobACL.MODIFY_JOB.getAclName(), " ");
+        }
+        //add to writer as well 
+        JobHistory.log(writers, RecordTypes.Job,
+                       new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER,
+                                  Keys.SUBMIT_TIME, Keys.JOBCONF,
+                                  Keys.VIEW_JOB, Keys.MODIFY_JOB,
+                                  Keys.JOB_QUEUE}, 
+                       new String[]{jobId.toString(), jobName, user, 
+                                    String.valueOf(submitTime) , jobConfPath,
+                                    viewJobACL, modifyJobACL,
+                                    jobConf.getQueueName()}, jobId
+                      ); 
+           
+      }catch(IOException e){
+        LOG.error("Failed creating job history log file for job " + jobId, e);
+        if (writer != null) {
+          fileManager.removeWriter(jobId, writer);
         }
       }
       // Always store job conf on local file system 
@@ -1369,18 +1358,16 @@ public class JobHistory {
      */
     public static void logInited(JobID jobId, long startTime, 
                                  int totalMaps, int totalReduces) {
-      if (!disableHistory){
-        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
+      ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Job, 
-              new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, 
-                          Keys.TOTAL_REDUCES, Keys.JOB_STATUS},
-              new String[] {jobId.toString(), String.valueOf(startTime), 
-                            String.valueOf(totalMaps), 
-                            String.valueOf(totalReduces), 
-                            Values.PREP.name()}); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Job, 
+            new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, 
+                        Keys.TOTAL_REDUCES, Keys.JOB_STATUS},
+            new String[] {jobId.toString(), String.valueOf(startTime), 
+                          String.valueOf(totalMaps), 
+                          String.valueOf(totalReduces), 
+                          Values.PREP.name()}, jobId); 
       }
     }
     
@@ -1405,15 +1392,13 @@ public class JobHistory {
      * @param jobId job id, assigned by jobtracker. 
      */
     public static void logStarted(JobID jobId){
-      if (!disableHistory){
-        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
+      ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Job, 
-              new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
-              new String[] {jobId.toString(),  
-                            Values.RUNNING.name()}); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Job, 
+            new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
+            new String[] {jobId.toString(),  
+                          Values.RUNNING.name()}, jobId); 
       }
     }
     
@@ -1433,34 +1418,32 @@ public class JobHistory {
                                    Counters mapCounters,
                                    Counters reduceCounters,
                                    Counters counters) {
-      if (!disableHistory){
         // close job file for this job
-        ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
+      ArrayList<PrintWriter> writer = fileManager.getWriters(jobId); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Job,          
-                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 
-                                     Keys.JOB_STATUS, Keys.FINISHED_MAPS, 
-                                     Keys.FINISHED_REDUCES,
-                                     Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
-                                     Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS,
-                                     Keys.COUNTERS},
-                         new String[] {jobId.toString(),  Long.toString(finishTime), 
-                                       Values.SUCCESS.name(), 
-                                       String.valueOf(finishedMaps), 
-                                       String.valueOf(finishedReduces),
-                                       String.valueOf(failedMaps), 
-                                       String.valueOf(failedReduces),
-                                       mapCounters.makeEscapedCompactString(),
-                                       reduceCounters.makeEscapedCompactString(),
-                                       counters.makeEscapedCompactString()});
-          for (PrintWriter out : writer) {
-            out.close();
-          }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Job,          
+                       new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 
+                                   Keys.JOB_STATUS, Keys.FINISHED_MAPS, 
+                                   Keys.FINISHED_REDUCES,
+                                   Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
+                                   Keys.MAP_COUNTERS, Keys.REDUCE_COUNTERS,
+                                   Keys.COUNTERS},
+                       new String[] {jobId.toString(),  Long.toString(finishTime), 
+                                     Values.SUCCESS.name(), 
+                                     String.valueOf(finishedMaps), 
+                                     String.valueOf(finishedReduces),
+                                     String.valueOf(failedMaps), 
+                                     String.valueOf(failedReduces),
+                                     mapCounters.makeEscapedCompactString(),
+                                     reduceCounters.makeEscapedCompactString(),
+                                     counters.makeEscapedCompactString()}, jobId);
+        for (PrintWriter out : writer) {
+          out.close();
         }
-        Thread historyCleaner  = new Thread(new HistoryCleaner());
-        historyCleaner.start(); 
       }
+      Thread historyCleaner  = new Thread(new HistoryCleaner());
+      historyCleaner.start(); 
     }
     /**
      * Logs job failed event. Closes the job history log file. 
@@ -1470,17 +1453,15 @@ public class JobHistory {
      * @param finishedReduces no of finished reduce tasks. 
      */
     public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){
-      if (!disableHistory){
-        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
+      ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Job,
-                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
-                         new String[] {jobid.toString(),  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 
-                                       String.valueOf(finishedReduces)}); 
-          for (PrintWriter out : writer) {
-            out.close();
-          }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Job,
+                       new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
+                       new String[] {jobid.toString(),  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 
+                                     String.valueOf(finishedReduces)}, jobid); 
+        for (PrintWriter out : writer) {
+          out.close();
         }
       }
     }
@@ -1498,18 +1479,16 @@ public class JobHistory {
      */
     public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
         int finishedReduces) {
-      if (!disableHistory) {
-        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
+      ArrayList<PrintWriter> writer = fileManager.getWriters(jobid);
 
-        if (null != writer) {
-          JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
-              Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
-              Keys.FINISHED_REDUCES }, new String[] { jobid.toString(),
-              String.valueOf(timestamp), Values.KILLED.name(),
-              String.valueOf(finishedMaps), String.valueOf(finishedReduces) });
-          for (PrintWriter out : writer) {
-            out.close();
-          }
+      if (null != writer) {
+        JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
+            Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
+            Keys.FINISHED_REDUCES }, new String[] { jobid.toString(),
+            String.valueOf(timestamp), Values.KILLED.name(),
+            String.valueOf(finishedMaps), String.valueOf(finishedReduces) }, jobid);
+        for (PrintWriter out : writer) {
+          out.close();
         }
       }
     }
@@ -1519,14 +1498,12 @@ public class JobHistory {
      * @param priority Jobs priority 
      */
     public static void logJobPriority(JobID jobid, JobPriority priority){
-      if (!disableHistory){
-        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
+      ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Job,
-                         new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY},
-                         new String[] {jobid.toString(), priority.toString()});
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Job,
+                       new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY},
+                       new String[] {jobid.toString(), priority.toString()}, jobid);
       }
     }
     /**
@@ -1545,17 +1522,15 @@ public class JobHistory {
 
     public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
     {
-      if (!disableHistory){
-        ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
+      ArrayList<PrintWriter> writer = fileManager.getWriters(jobid); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Job,
-                         new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, 
-                                     Keys.LAUNCH_TIME},
-                         new String[] {jobid.toString(), 
-                                       String.valueOf(submitTime), 
-                                       String.valueOf(launchTime)});
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Job,
+                       new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, 
+                                   Keys.LAUNCH_TIME},
+                       new String[] {jobid.toString(), 
+                                     String.valueOf(submitTime), 
+                                     String.valueOf(launchTime)}, jobid);
       }
     }
   }
@@ -1576,18 +1551,16 @@ public class JobHistory {
      */
     public static void logStarted(TaskID taskId, String taskType, 
                                   long startTime, String splitLocations) {
-      if (!disableHistory){
-        JobID id = taskId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Task, 
-                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
-                                    Keys.START_TIME, Keys.SPLITS}, 
-                         new String[]{taskId.toString(), taskType,
-                                      String.valueOf(startTime),
-                                      splitLocations});
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Task, 
+                       new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
+                                  Keys.START_TIME, Keys.SPLITS}, 
+                       new String[]{taskId.toString(), taskType,
+                                    String.valueOf(startTime),
+                                    splitLocations}, id);
       }
     }
     /**
@@ -1598,19 +1571,17 @@ public class JobHistory {
      */
     public static void logFinished(TaskID taskId, String taskType, 
                                    long finishTime, Counters counters){
-      if (!disableHistory){
-        JobID id = taskId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id);
+      JobID id = taskId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id);
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Task, 
-                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
-                                    Keys.TASK_STATUS, Keys.FINISH_TIME,
-                                    Keys.COUNTERS}, 
-                         new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), 
-                                       String.valueOf(finishTime),
-                                       counters.makeEscapedCompactString()});
-        }
+      if (null != writer){
+         JobHistory.log(writer, RecordTypes.Task, 
+                        new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
+                                   Keys.TASK_STATUS, Keys.FINISH_TIME,
+                                   Keys.COUNTERS}, 
+                        new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), 
+                                      String.valueOf(finishTime),
+                                      counters.makeEscapedCompactString()}, id);
       }
     }
 
@@ -1620,16 +1591,14 @@ public class JobHistory {
      * @param finishTime finish time of task in ms
      */
     public static void logUpdates(TaskID taskId, long finishTime){
-      if (!disableHistory){
-        JobID id = taskId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.Task, 
-                         new Keys[]{Keys.TASKID, Keys.FINISH_TIME}, 
-                         new String[]{ taskId.toString(), 
-                                       String.valueOf(finishTime)});
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.Task, 
+                       new Keys[]{Keys.TASKID, Keys.FINISH_TIME}, 
+                       new String[]{ taskId.toString(), 
+                                     String.valueOf(finishTime)}, id);
       }
     }
 
@@ -1650,23 +1619,21 @@ public class JobHistory {
     public static void logFailed(TaskID taskId, String taskType, long time,
                                  String error, 
                                  TaskAttemptID failedDueToAttempt){
-      if (!disableHistory){
-        JobID id = taskId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          String failedAttempt = failedDueToAttempt == null
-                                 ? ""
-                                 : failedDueToAttempt.toString();
-          JobHistory.log(writer, RecordTypes.Task, 
-                         new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
-                                    Keys.TASK_STATUS, Keys.FINISH_TIME, 
-                                    Keys.ERROR, Keys.TASK_ATTEMPT_ID}, 
-                         new String[]{ taskId.toString(),  taskType, 
-                                      Values.FAILED.name(), 
-                                      String.valueOf(time) , error, 
-                                      failedAttempt});
-        }
+      if (null != writer){
+        String failedAttempt = failedDueToAttempt == null
+                               ? ""
+                               : failedDueToAttempt.toString();
+        JobHistory.log(writer, RecordTypes.Task, 
+                       new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
+                                  Keys.TASK_STATUS, Keys.FINISH_TIME, 
+                                  Keys.ERROR, Keys.TASK_ATTEMPT_ID}, 
+                       new String[]{ taskId.toString(),  taskType, 
+                                    Values.FAILED.name(), 
+                                    String.valueOf(time) , error, 
+                                    failedAttempt}, id);
       }
     }
     /**
@@ -1712,22 +1679,20 @@ public class JobHistory {
     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
                                   String trackerName, int httpPort, 
                                   String taskType) {
-      if (!disableHistory){
-        JobID id = taskAttemptId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskAttemptId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
-                                     Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
-                                     Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{taskType,
-                                      taskAttemptId.getTaskID().toString(), 
-                                      taskAttemptId.toString(), 
-                                      String.valueOf(startTime), trackerName,
-                                      httpPort == -1 ? "" : 
-                                        String.valueOf(httpPort)}); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.MapAttempt, 
+                       new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
+                                   Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
+                                   Keys.TRACKER_NAME, Keys.HTTP_PORT},
+                       new String[]{taskType,
+                                    taskAttemptId.getTaskID().toString(), 
+                                    taskAttemptId.toString(), 
+                                    String.valueOf(startTime), trackerName,
+                                    httpPort == -1 ? "" : 
+                                      String.valueOf(httpPort)}, id); 
       }
     }
     
@@ -1762,24 +1727,22 @@ public class JobHistory {
                                    String taskType,
                                    String stateString, 
                                    Counters counter) {
-      if (!disableHistory){
-        JobID id = taskAttemptId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskAttemptId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
-                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                     Keys.FINISH_TIME, Keys.HOSTNAME, 
-                                     Keys.STATE_STRING, Keys.COUNTERS},
-                         new String[]{taskType, 
-                                      taskAttemptId.getTaskID().toString(),
-                                      taskAttemptId.toString(), 
-                                      Values.SUCCESS.name(),  
-                                      String.valueOf(finishTime), hostName, 
-                                      stateString, 
-                                      counter.makeEscapedCompactString()}); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.MapAttempt, 
+                       new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
+                                   Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                   Keys.FINISH_TIME, Keys.HOSTNAME, 
+                                   Keys.STATE_STRING, Keys.COUNTERS},
+                       new String[]{taskType, 
+                                    taskAttemptId.getTaskID().toString(),
+                                    taskAttemptId.toString(), 
+                                    Values.SUCCESS.name(),  
+                                    String.valueOf(finishTime), hostName, 
+                                    stateString, 
+                                    counter.makeEscapedCompactString()}, id); 
       }
     }
 
@@ -1811,22 +1774,20 @@ public class JobHistory {
     public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, 
                                  String error, String taskType) {
-      if (!disableHistory){
-        JobID id = taskAttemptId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskAttemptId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
-                                    Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                    Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ taskType, 
-                                       taskAttemptId.getTaskID().toString(),
-                                       taskAttemptId.toString(), 
-                                       Values.FAILED.name(),
-                                       String.valueOf(timestamp), 
-                                       hostName, error}); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.MapAttempt, 
+                       new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
+                                  Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                  Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
+                       new String[]{ taskType, 
+                                     taskAttemptId.getTaskID().toString(),
+                                     taskAttemptId.toString(), 
+                                     Values.FAILED.name(),
+                                     String.valueOf(timestamp), 
+                                     hostName, error}, id); 
       }
     }
     
@@ -1857,23 +1818,21 @@ public class JobHistory {
     public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName,
                                  String error, String taskType) {
-      if (!disableHistory){
-        JobID id = taskAttemptId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskAttemptId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.MapAttempt, 
-                         new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
-                                    Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                    Keys.FINISH_TIME, Keys.HOSTNAME,
-                                    Keys.ERROR},
-                         new String[]{ taskType, 
-                                       taskAttemptId.getTaskID().toString(), 
-                                       taskAttemptId.toString(),
-                                       Values.KILLED.name(),
-                                       String.valueOf(timestamp), 
-                                       hostName, error}); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.MapAttempt, 
+                       new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
+                                  Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                  Keys.FINISH_TIME, Keys.HOSTNAME,
+                                  Keys.ERROR},
+                       new String[]{ taskType, 
+                                     taskAttemptId.getTaskID().toString(), 
+                                     taskAttemptId.toString(),
+                                     Values.KILLED.name(),
+                                     String.valueOf(timestamp), 
+                                     hostName, error}, id); 
       }
     } 
   }
@@ -1909,62 +1868,59 @@ public class JobHistory {
                                   long startTime, String trackerName, 
                                   int httpPort, 
                                   String taskType) {
-      if (!disableHistory){
-        JobID id = taskAttemptId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+              JobID id = taskAttemptId.getJobID();
+	      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
-                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
-                                      Keys.TRACKER_NAME, Keys.HTTP_PORT},
-                         new String[]{taskType,
-                                      taskAttemptId.getTaskID().toString(), 
-                                      taskAttemptId.toString(), 
-                                      String.valueOf(startTime), trackerName,
-                                      httpPort == -1 ? "" : 
-                                        String.valueOf(httpPort)}); 
-        }
-      }
-    }
-    
-    /**
-     * Log finished event of this task. 
-     * @param taskAttemptId task attempt id
-     * @param shuffleFinished shuffle finish time
-     * @param sortFinished sort finish time
-     * @param finishTime finish time of task
-     * @param hostName host name where task attempt executed
-     * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
-     */
-    @Deprecated
-    public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
-                                   long sortFinished, long finishTime, 
-                                   String hostName){
-      logFinished(taskAttemptId, shuffleFinished, sortFinished, 
-                  finishTime, hostName, Values.REDUCE.name(),
-                  "", new Counters());
-    }
-    
-    /**
-     * Log finished event of this task. 
-     * 
-     * @param taskAttemptId task attempt id
-     * @param shuffleFinished shuffle finish time
-     * @param sortFinished sort finish time
-     * @param finishTime finish time of task
-     * @param hostName host name where task attempt executed
-     * @param taskType Whether the attempt is cleanup or setup or reduce 
-     * @param stateString the state string of the attempt
-     * @param counter counters of the attempt
-     */
-    public static void logFinished(TaskAttemptID taskAttemptId, 
-                                   long shuffleFinished, 
-                                   long sortFinished, long finishTime, 
-                                   String hostName, String taskType,
-                                   String stateString, Counters counter) {
-      if (!disableHistory){
+              if (null != writer){
+		  JobHistory.log(writer, RecordTypes.ReduceAttempt, 
+				 new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
+					      Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
+					      Keys.TRACKER_NAME, Keys.HTTP_PORT},
+				 new String[]{taskType,
+					      taskAttemptId.getTaskID().toString(), 
+					      taskAttemptId.toString(), 
+					      String.valueOf(startTime), trackerName,
+					      httpPort == -1 ? "" : 
+						String.valueOf(httpPort)}, id); 
+              }
+	    }
+	    
+	    /**
+	     * Log finished event of this task. 
+	     * @param taskAttemptId task attempt id
+	     * @param shuffleFinished shuffle finish time
+	     * @param sortFinished sort finish time
+	     * @param finishTime finish time of task
+	     * @param hostName host name where task attempt executed
+	     * @deprecated Use 
+	     * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
+	     */
+	    @Deprecated
+	    public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
+					   long sortFinished, long finishTime, 
+					   String hostName){
+	      logFinished(taskAttemptId, shuffleFinished, sortFinished, 
+			  finishTime, hostName, Values.REDUCE.name(),
+			  "", new Counters());
+	    }
+	    
+	    /**
+	     * Log finished event of this task. 
+	     * 
+	     * @param taskAttemptId task attempt id
+	     * @param shuffleFinished shuffle finish time
+	     * @param sortFinished sort finish time
+	     * @param finishTime finish time of task
+	     * @param hostName host name where task attempt executed
+	     * @param taskType Whether the attempt is cleanup or setup or reduce 
+	     * @param stateString the state string of the attempt
+	     * @param counter counters of the attempt
+	     */
+	    public static void logFinished(TaskAttemptID taskAttemptId, 
+					   long shuffleFinished, 
+					   long sortFinished, long finishTime, 
+					   String hostName, String taskType,
+					   String stateString, Counters counter) {
         JobID id = taskAttemptId.getJobID();
         ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
@@ -1983,9 +1939,8 @@ public class JobHistory {
                                       String.valueOf(sortFinished),
                                       String.valueOf(finishTime), hostName,
                                       stateString, 
-                                      counter.makeEscapedCompactString()}); 
+                                      counter.makeEscapedCompactString()}, id); 
         }
-      }
     }
     
     /**
@@ -2015,22 +1970,20 @@ public class JobHistory {
     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
                                  String taskType) {
-      if (!disableHistory){
-        JobID id = taskAttemptId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskAttemptId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
-                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                      Keys.FINISH_TIME, Keys.HOSTNAME,
-                                      Keys.ERROR },
-                         new String[]{ taskType, 
-                                       taskAttemptId.getTaskID().toString(), 
-                                       taskAttemptId.toString(), 
-                                       Values.FAILED.name(), 
-                                       String.valueOf(timestamp), hostName, error }); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.ReduceAttempt, 
+                       new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
+                                    Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                    Keys.FINISH_TIME, Keys.HOSTNAME,
+                                    Keys.ERROR },
+                       new String[]{ taskType, 
+                                     taskAttemptId.getTaskID().toString(), 
+                                     taskAttemptId.toString(), 
+                                     Values.FAILED.name(), 
+                                     String.valueOf(timestamp), hostName, error }, id); 
       }
     }
     
@@ -2061,23 +2014,21 @@ public class JobHistory {
     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error, 
                                  String taskType) {
-      if (!disableHistory){
-        JobID id = taskAttemptId.getJobID();
-        ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
+      JobID id = taskAttemptId.getJobID();
+      ArrayList<PrintWriter> writer = fileManager.getWriters(id); 
 
-        if (null != writer){
-          JobHistory.log(writer, RecordTypes.ReduceAttempt, 
-                         new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
-                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
-                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
-                                      Keys.ERROR },
-                         new String[]{ taskType,
-                                       taskAttemptId.getTaskID().toString(), 
-                                       taskAttemptId.toString(), 
-                                       Values.KILLED.name(), 
-                                       String.valueOf(timestamp), 
-                                       hostName, error }); 
-        }
+      if (null != writer){
+        JobHistory.log(writer, RecordTypes.ReduceAttempt, 
+                       new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
+                                    Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                    Keys.FINISH_TIME, Keys.HOSTNAME, 
+                                    Keys.ERROR },
+                       new String[]{ taskType,
+                                     taskAttemptId.getTaskID().toString(), 
+                                     taskAttemptId.toString(), 
+                                     Values.KILLED.name(), 
+                                     String.valueOf(timestamp), 
+                                     hostName, error }, id); 
       }
     }
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077531&r1=1077530&r2=1077531&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 04:24:57 2011
@@ -2126,14 +2126,14 @@ public class JobTracker implements MRCon
     infoServer.setAttribute("job.tracker", this);
     // initialize history parameters.
     final JobTracker jtFinal = this;
-    boolean historyInitialized = 
-      getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
-        @Override
-        public Boolean run() throws Exception {
-          return JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
-              jtFinal.startTime);
-        }
-      });
+    getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
+            jtFinal.startTime);
+        return true;
+      }
+    });
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
@@ -2202,7 +2202,6 @@ public class JobTracker implements MRCon
         // Check if the history is enabled .. as we cant have persistence with 
         // history disabled
         if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
-            && !JobHistory.isDisableHistory()
             && systemDirData != null) {
           for (FileStatus status : systemDirData) {
             try {
@@ -2249,20 +2248,18 @@ public class JobTracker implements MRCon
     }
 
     // Initialize history DONE folder
-    if (historyInitialized) {
-      FileSystem historyFS = getMROwner().doAs(
-          new PrivilegedExceptionAction<FileSystem>() {
-        public FileSystem run() throws IOException {
-          JobHistory.initDone(conf, fs);
-          final String historyLogDir = 
-            JobHistory.getCompletedJobHistoryLocation().toString();
-          infoServer.setAttribute("historyLogDir", historyLogDir);
-          
-          return new Path(historyLogDir).getFileSystem(conf);
-        }
-      });
-      infoServer.setAttribute("fileSys", historyFS);
-    }
+    FileSystem historyFS = getMROwner().doAs(
+        new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws IOException {
+        JobHistory.initDone(conf, fs);
+        final String historyLogDir = 
+          JobHistory.getCompletedJobHistoryLocation().toString();
+        infoServer.setAttribute("historyLogDir", historyLogDir);
+        
+        return new Path(historyLogDir).getFileSystem(conf);
+      }
+    });
+    infoServer.setAttribute("fileSys", historyFS);
 
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java?rev=1077531&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryConfig.java Fri Mar  4 04:24:57 2011
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.taglibs.standard.extra.spath.Predicate;
+
+import org.mortbay.log.Log;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Test {@link JobTracker} w.r.t config parameters.
+ */
+public class TestJobHistoryConfig extends TestCase {
+  // private MiniMRCluster mr = null;
+  private MiniDFSCluster mdfs = null;
+  private String namenode = null;
+  FileSystem fileSys = null;
+  final Path inDir = new Path("./input");
+  final Path outDir = new Path("./output");
+
+  private void setUpCluster(JobConf conf) throws IOException,
+      InterruptedException {
+    mdfs = new MiniDFSCluster(conf, 1, true, null);
+    fileSys = mdfs.getFileSystem();
+    namenode = fileSys.getUri().toString();
+  }
+
+  /**
+   * Test case to make sure that JobTracker will start and JobHistory enabled
+   * <ol>
+   * <li>Run a job with valid jobhistory configuration</li>
+   * <li>Check if JobTracker can start</li>
+   * </ol>
+   * 
+   * @throws Exception
+   */
+
+  public void testJobHistoryWithValidConfiguration() throws Exception {
+    try {
+      JobConf conf = new JobConf();
+      setUpCluster(conf);
+      conf.set("hadoop.job.history.location", "/hadoop/history");
+      conf = MiniMRCluster.configureJobConf(conf, namenode, 0, 0, null);
+      boolean started = canStartJobTracker(conf);
+      assertTrue(started);
+    } finally {
+      if (mdfs != null) {
+        try {
+          mdfs.shutdown();
+        } catch (Exception e) {
+        }
+      }
+    }
+  }
+
+  public static class MapperClass extends MapReduceBase implements
+      Mapper<LongWritable, Text, Text, IntWritable> {
+    public void configure(JobConf job) {
+    }
+
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      throw new IOException();
+    }
+  }
+
+  public void testJobHistoryLogging() throws Exception {
+    JobConf conf = new JobConf();
+    setUpCluster(conf);
+    conf.setMapperClass(MapperClass.class);
+    conf.setReducerClass(IdentityReducer.class);
+    conf.setNumReduceTasks(0);
+    JobClient jc = new JobClient(conf);
+    conf.set("hadoop.job.history.location", "/hadoop/history");
+    conf = MiniMRCluster.configureJobConf(conf, namenode, 0, 0, null);
+    FileSystem inFs = inDir.getFileSystem(conf);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setSpeculativeExecution(false);
+    conf.setJobName("test");
+    conf.setUser("testuser");
+    conf.setQueueName("testQueue");
+    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+        "/tmp")).toString().replace(' ', '+');
+    JobTracker jt = JobTracker.startTracker(conf);
+    assertTrue(jt != null);
+    JobInProgress jip = new JobInProgress(new JobID("jt", 1),
+        new JobConf(conf), jt);
+    assertTrue(jip != null);
+    jip.jobFile = "testfile";
+    String historyFile = JobHistory.getHistoryFilePath(jip.getJobID());
+    JobHistory.JobInfo.logSubmitted(jip.getJobID(), jip.getJobConf(),
+        jip.jobFile, jip.startTime);
+  }
+
+  /**
+   * Check whether the JobTracker can be started.
+   * 
+   * @throws IOException
+   */
+  private boolean canStartJobTracker(JobConf conf) throws InterruptedException,
+      IOException {
+    JobTracker jt = null;
+    try {
+      jt = JobTracker.startTracker(conf);
+      Log.info("Started JobTracker");
+    } catch (IOException e) {
+      Log.info("Can not Start JobTracker", e.getLocalizedMessage());
+      return false;
+    }
+    if (jt != null) {
+      jt.fs.close();
+      jt.stopTracker();
+    }
+    return true;
+  }
+}



Mime
View raw message