hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r629369 [3/3] - in /hadoop/core/trunk: ./ conf/ docs/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/test/org/a...
Date Wed, 20 Feb 2008 06:04:58 GMT
Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Tue Feb 19 22:04:44 2008
@@ -23,6 +23,7 @@
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.dfs.MiniDFSCluster;
@@ -100,7 +101,9 @@
         job.go();
         String line = null;
         String line2 = null;
-        Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
+        Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
+                                     new Path(OUTPUT_DIR),
+                                     new OutputLogFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Tue Feb 19 22:04:44 2008
@@ -23,6 +23,7 @@
 import java.util.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.dfs.MiniDFSCluster;
@@ -91,7 +92,9 @@
         job = new StreamJob(argv, mayExit);      
         job.go();
         String line = null;
-        Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR));
+        Path[] fileList = FileUtil.stat2Paths(fileSys.listStatus(
+                                                new Path(OUTPUT_DIR),
+                                                new OutputLogFilter()));
         for (int i = 0; i < fileList.length; i++){
           System.out.println(fileList[i].toString());
           BufferedReader bread =

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/cluster_setup.xml Tue Feb 19 22:04:44 2008
@@ -365,6 +365,35 @@
           Commons Logging</a> framework for logging. Edit the 
           <code>conf/log4j.properties</code> file to customize the Hadoop 
           daemons' logging configuration (log-formats and so on).</p>
+          
+          <section>
+            <title>History Logging</title>
+            
+            <p> The job history files are stored in central location 
+            <code> hadoop.job.history.location </code> which can be on DFS also,
+            whose default value is <code>${HADOOP_LOG_DIR}/history</code>. 
+            Job history server is started on job tracker. The history 
+            web UI is accessible from job tracker web UI.</p>
+            
+            <p> The history files are also logged to user specified directory
+            <code>hadoop.job.history.user.location</code> 
+            which defaults to job output directory. The files are stored in
+            "_logs/history/" in the specified directory. Hence, by default 
+            they will be in "mapred.output.dir/_logs/history/". User can stop
+            logging by giving the value <code>none</code> for 
+            <code>hadoop.job.history.user.location</code> </p>
+            
+            <p> User can view logs in specified directory using 
+            the following command <br/>
+            <code>$ bin/hadoop job -history output-dir</code><br/>
+            This will start a stand alone jetty on the client and 
+            load history jsp's. 
+            It will display the port where the server is up at. The server will
+            be up for 30 minutes. User has to use 
+            <code> http://hostname:port </code> to view the history. User can 
+            also provide http bind address using 
+            <code>mapred.job.history.http.bindAddress</code></p>
+          </section>
         </section>
       </section>
       

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Tue Feb 19 22:04:44 2008
@@ -1110,7 +1110,29 @@
             monitoring it's status.
           </li>
         </ol>
-  
+        <p> Job history files are also logged to user specified directory
+        <code>hadoop.job.history.user.location</code> 
+        which defaults to job output directory. The files are stored in
+        "_logs/history/" in the specified directory. Hence, by default they will
+        be in mapred.output.dir/_logs/history. User can stop
+        logging by giving the value <code>none</code> for 
+        <code>hadoop.job.history.user.location</code></p>
+
+        <p> User can view logs in specified directory using 
+        the following command <br/>
+        <code>$ bin/hadoop job -history output-dir</code><br/>
+        This will start a stand alone jetty on the client and 
+        load history jsp's. 
+        It will display the port where the server is up at. The server will
+        be up for 30 minutes. User has to use 
+        <code> http://hostname:port </code> to view the history. User can 
+        also provide http bind address using 
+        <code>mapred.job.history.http.bindAddress</code></p>
+            
+        <p> User can use 
+        <a href="ext:api/org/apache/hadoop/mapred/outputlogfilter">OutputLogFilter</a>
+        to filter log files from the output directory listing. </p>
+        
         <p>Normally the user creates the application, describes various facets 
         of the job via <code>JobConf</code>, and then uses the 
         <code>JobClient</code> to submit the job and monitor its progress.</p>

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml Tue Feb 19 22:04:44 2008
@@ -169,6 +169,7 @@
                 <setcompressoutput href="#setCompressOutput(org.apache.hadoop.mapred.JobConf,%20boolean)" />
                 <setoutputcompressorclass href="#setOutputCompressorClass(org.apache.hadoop.mapred.JobConf,%20java.lang.Class)" />
               </outputformatbase>
+              <outputlogfilter href="OutputLogFilter.html" />
               <sequencefileoutputformat href="SequenceFileOutputFormat.html">
                 <setoutputcompressiontype href="#setOutputCompressionType(org.apache.hadoop.mapred.JobConf,%20org.apache.hadoop.io.SequenceFile.CompressionType)" />
               </sequencefileoutputformat>

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java Tue Feb 19 22:04:44 2008
@@ -20,6 +20,8 @@
 
 import java.util.*;
 import java.io.*;
+
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobHistory.Keys; 
 import org.apache.hadoop.mapred.JobHistory.Values;
 
@@ -36,39 +38,19 @@
   // possible if it is a non-generic class.
 
   /**
-   * Contents of a job history file. Maps: 
-   * <xmp>jobTrackerId -> <jobId, JobHistory.JobInfo>*</xmp>
-   */
-  public static class MasterIndex 
-    extends TreeMap<String, Map<String, JobHistory.JobInfo>> {
-    
-  }
-
-  /**
-   * Parses a master index file and returns a {@link MasterIndex}.
-   * @param historyFile master index history file. 
-   * @return a {@link MasterIndex}.  
-   * @throws IOException
-   */
-  public static MasterIndex parseMasterIndex(File historyFile)
-    throws IOException {
-    MasterIndexParseListener parser = new MasterIndexParseListener();
-    JobHistory.parseHistory(historyFile, parser);
-
-    return parser.getValues();
-  }
-
-  /**
    * Populates a JobInfo object from the job's history log file. 
    * @param jobHistoryFile history file for this job. 
    * @param job a precreated JobInfo object, should be non-null. 
+   * @param fs FileSystem where historyFile is present. 
    * @throws IOException
    */
-  public static void parseJobTasks(File jobHistoryFile, JobHistory.JobInfo job)
+  public static void parseJobTasks(String jobHistoryFile, 
+                       JobHistory.JobInfo job, FileSystem fs)
     throws IOException {
-    JobHistory.parseHistory(jobHistoryFile, 
-                            new JobTasksParseListener(job));
+    JobHistory.parseHistoryFromFS(jobHistoryFile, 
+                            new JobTasksParseListener(job), fs);
   }
+  
   /**
    * Listener for Job's history log file, it populates JobHistory.JobInfo 
    * object with data from log file. 
@@ -144,48 +126,6 @@
     }
   }
 
-  /**
-   * Parses and returns a map of values in master index. 
-   * 
-   */
-  static class MasterIndexParseListener
-    implements JobHistory.Listener {
-    MasterIndex jobTrackerToJobs = new MasterIndex();
-
-    Map<String, JobHistory.JobInfo> activeJobs = null;
-    String currentTracker; 
-    
-    // Implement JobHistory.Listener
-
-    public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values)
-      throws IOException {
- 
-      if (recType.equals(JobHistory.RecordTypes.Jobtracker)) {
-        activeJobs = new TreeMap<String, JobHistory.JobInfo>();
-        currentTracker = values.get(Keys.START_TIME);
-        jobTrackerToJobs.put(currentTracker, activeJobs);
-      } else if (recType.equals(JobHistory.RecordTypes.Job)) {
-        String jobId = values.get(Keys.JOBID);
-        JobHistory.JobInfo job = activeJobs.get(jobId);
-        if (null == job) {
-          job = new JobHistory.JobInfo(jobId);
-          job.set(Keys.JOBTRACKERID, currentTracker);
-          activeJobs.put(jobId, job);
-        }
-        job.handle(values);
-      }
-    }
-
-    /**
-     * Return map of parsed values. 
-     * @return
-     */ 
-    MasterIndex getValues() {
-      return jobTrackerToJobs;
-    }
-  }
-  
-  
   // call this only for jobs that succeeded for better results. 
   static class FailedOnNodesFilter implements JobHistory.Listener {
     private Map<String, Set<String>> badNodesToNumFailedTasks =

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Feb 19 22:04:44 2008
@@ -1009,6 +1009,7 @@
     System.out.printf("\t-status\t<job-id>\n");
     System.out.printf("\t-kill\t<job-id>\n");
     System.out.printf("\t-events\t<job-id> <from-event-#> <#-of-events>\n");
+    System.out.printf("\t-history\t<jobOutputDir>\n");
     System.out.printf("\t-list\n");
     System.out.printf("\t-list\tall\n");
     System.out.printf("\t-kill-task <task-id>\n");
@@ -1022,11 +1023,13 @@
     String submitJobFile = null;
     String jobid = null;
     String taskid = null;
+    String outputDir = null;
     int fromEvent = 0;
     int nEvents = 0;
     boolean getStatus = false;
     boolean killJob = false;
     boolean listEvents = false;
+    boolean viewHistory = false;
     boolean listJobs = false;
     boolean listAllJobs = false;
     boolean killTask = false;
@@ -1056,6 +1059,11 @@
       fromEvent = Integer.parseInt(argv[2]);
       nEvents = Integer.parseInt(argv[3]);
       listEvents = true;
+    } else if ("-history".equals(argv[0])) {
+      if (argv.length != 2)
+        displayUsage();
+        outputDir = argv[1];
+        viewHistory = true;
     } else if ("-list".equals(argv[0])) {
       if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1])))
         displayUsage();
@@ -1112,6 +1120,10 @@
           System.out.println("Killed job " + jobid);
           exitCode = 0;
         }
+      } else if (viewHistory) {
+    	// start http server
+        viewHistory(outputDir);
+        exitCode = 0;
       } else if (listEvents) {
         listEvents(jobid, fromEvent, nEvents);
         exitCode = 0;
@@ -1144,6 +1156,45 @@
     return exitCode;
   }
 
+  private void viewHistory(String outputDir) 
+    throws IOException {
+
+    Path output = new Path(outputDir);
+    FileSystem fs = output.getFileSystem(getConf());
+
+    // start http server used to provide an HTML view on Job history
+    StatusHttpServer infoServer;
+    String infoAddr = new JobConf(getConf()).get(
+             "mapred.job.history.http.bindAddress", "0.0.0.0:0");
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+    String infoBindAddress = infoSocAddr.getHostName();
+    int tmpInfoPort = infoSocAddr.getPort();
+    infoServer = new StatusHttpServer("history", infoBindAddress, tmpInfoPort,
+                                       tmpInfoPort == 0);
+    infoServer.setAttribute("fileSys", fs);
+    infoServer.setAttribute("historyLogDir", outputDir + "/_logs/history");
+    infoServer.start();
+    int infoPort = infoServer.getPort();
+    getConf().set("mapred.job.history.http.bindAddress", 
+        infoBindAddress + ":" + infoPort);
+    LOG.info("JobHistory webserver up at: " + infoPort);
+
+    // let the server be up for 30 minutes.
+    try {
+      Thread.sleep(30 * 60 * 1000);
+    } catch (InterruptedException ie) {}
+      
+    // stop infoServer
+    if (infoServer != null) {
+      LOG.info("Stopping infoServer");
+      try {
+        infoServer.stop();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    } 
+  }
+  
   /**
    * List the events for the given job
    * @param jobId the job id for the job's events to list

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Tue Feb 19 22:04:44 2008
@@ -24,7 +24,9 @@
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -34,7 +36,12 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
+
 /**
  * Provides methods for writing to and reading from job history. 
  * Job History works in an append mode, JobHistory and its inner classes provide methods 
@@ -60,13 +67,14 @@
   private static final String VALUE = "[[^\"]?]+"; // anything but a " in ""
   
   private static final Pattern pattern = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
+  private static final int MAX_FILENAME_SIZE = 255;
   
-  public static final String JOBTRACKER_START_TIME = String.valueOf(System.currentTimeMillis()); 
-  private static final String LOG_DIR = System.getProperty("hadoop.log.dir") + File.separator + "history"; 
-  public static final String MASTER_INDEX_LOG_FILE = "JobHistory.log"; 
-  
-  private static PrintWriter masterIndex = null;
-  private static Map<String, PrintWriter> openJobs = new HashMap<String, PrintWriter>(); 
+  public static final String JOBTRACKER_START_TIME =
+                               String.valueOf(System.currentTimeMillis());
+  private static String JOBTRACKER_UNIQUE_STRING = null;
+  private static String LOG_DIR = null;
+  private static Map<String, ArrayList<PrintWriter>> openJobs = 
+                     new HashMap<String, ArrayList<PrintWriter>>();
   private static boolean disableHistory = false; 
   /**
    * Record types are identifiers for each line of log in history files. 
@@ -101,27 +109,23 @@
   // temp buffer for parsed dataa
   private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>(); 
 
-  // init log files
-  static { init(); } 
-  
   /**
    * Initialize JobHistory files. 
    *
    */
-  private static void init(){
+  public static void init(JobConf conf, String hostname){
     if (!disableHistory){
       try{
-        File logDir = new File(LOG_DIR); 
-        if (!logDir.exists()){
-          if (!logDir.mkdirs()){
+        LOG_DIR = conf.get("hadoop.job.history.location");
+        JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
+                                   JOBTRACKER_START_TIME + "_";
+        Path logDir = new Path(LOG_DIR);
+        FileSystem fs = logDir.getFileSystem(conf);
+        if (!fs.exists(logDir)){
+          if (!fs.mkdirs(logDir)){
             throw new IOException("Mkdirs failed to create " + logDir.toString());
           }
         }
-        masterIndex = 
-          new PrintWriter(
-                          new FileOutputStream(new File(LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE), true));
-        // add jobtracker id = tracker start time
-        log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, JOBTRACKER_START_TIME);  
       }catch(IOException e){
         LOG.error("Failed to initialize JobHistory log file", e); 
         disableHistory = true; 
@@ -129,17 +133,19 @@
     }
   }
 
-
   /**
-   * Parses history file and invokes Listener.handle() for each line of history. It can 
-   * be used for looking through history files for specific items without having to keep 
-   * whlole history in memory. 
+   * Parses history file and invokes Listener.handle() for 
+   * each line of history. It can be used for looking through history
+   * files for specific items without having to keep whole history in memory. 
    * @param path path to history file
    * @param l Listener for history events 
+   * @param fs FileSystem where history file is present
    * @throws IOException
    */
-  public static void parseHistory(File path, Listener l) throws IOException{
-    BufferedReader reader = new BufferedReader(new FileReader(path));
+  public static void parseHistoryFromFS(String path, Listener l, FileSystem fs)
+  throws IOException{
+    FSDataInputStream in = fs.open(new Path(path));
+    BufferedReader reader = new BufferedReader(new InputStreamReader (in));
     try {
       String line = null; 
       StringBuffer buf = new StringBuffer(); 
@@ -155,6 +161,7 @@
       try { reader.close(); } catch (IOException ex) {}
     }
   }
+
   /**
    * Parse a single line of history. 
    * @param line
@@ -203,8 +210,8 @@
    * @param values type of log event
    */
 
-  static void log(PrintWriter out, RecordTypes recordType, Keys[] keys, 
-                  String[] values){
+  static void log(ArrayList<PrintWriter> writers, RecordTypes recordType, 
+                  Keys[] keys, String[] values) {
     StringBuffer buf = new StringBuffer(recordType.name()); 
     buf.append(DELIMITER); 
     for(int i =0; i< keys.length; i++){
@@ -215,8 +222,10 @@
       buf.append(DELIMITER); 
     }
     
-    out.println(buf.toString());
-    out.flush(); 
+    for (PrintWriter out : writers) {
+      out.println(buf.toString());
+      out.flush();
+    }
   }
   
   /**
@@ -331,7 +340,8 @@
      * @return the path of the job file on the local file system 
      */
     public static String getLocalJobFilePath(String jobId){
-      return LOG_DIR + File.separator + jobId + "_conf.xml";
+      return System.getProperty("hadoop.log.dir") + File.separator +
+               jobId + "_conf.xml";
     }
     
     /**
@@ -347,23 +357,63 @@
                                     String jobConfPath, long submitTime) {
       String jobName = jobConf.getJobName();
       String user = jobConf.getUser(); 
+      FileSystem fs = null;
+      String userLogDir = null;
+      String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
+
       if (!disableHistory){
-        synchronized(MASTER_INDEX_LOG_FILE){
-          JobHistory.log(masterIndex, RecordTypes.Job, 
-                         new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF }, 
-                         new String[]{jobId, jobName, user, 
-                                      String.valueOf(submitTime), jobConfPath}
-                        );
-        }
         // setup the history log file for this job
-        String logFileName =  JOBTRACKER_START_TIME + "_" + jobId; 
-        File logFile = new File(LOG_DIR + File.separator + logFileName);
-        
+        String logFileName = jobUniqueString +  
+                             "_" + user+ "_" + jobName;
+        if (logFileName.length() > MAX_FILENAME_SIZE) {
+          logFileName = logFileName.substring(0, MAX_FILENAME_SIZE-1);
+        }
+
+        // find user log directory 
+        Path outputPath = jobConf.getOutputPath();
+        userLogDir = jobConf.get("hadoop.job.history.user.location",
+        		outputPath == null ? null : outputPath.toString());
+        if ("none".equals(userLogDir)) {
+          userLogDir = null;
+        }
+        if (userLogDir != null) {
+          userLogDir = userLogDir + "/_logs/history";
+        }
+
+        String logFile = null;
+        String userLogFile = null;
+        if (LOG_DIR != null ) {
+          logFile = LOG_DIR + File.separator + logFileName;
+        }
+        if (userLogDir != null ) {
+          userLogFile = userLogDir + File.separator + logFileName;
+        }
+
         try{
-          PrintWriter writer = new PrintWriter(logFile);
-          openJobs.put(logFileName, writer);
-          // add to writer as well 
-          JobHistory.log(writer, RecordTypes.Job, 
+          ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
+          FSDataOutputStream out = null;
+          PrintWriter writer = null;
+
+          if (LOG_DIR != null) {
+            // create output stream for logging in hadoop.job.history.location
+            fs = new Path(LOG_DIR).getFileSystem(jobConf);
+            out = fs.create(new Path(logFile), true, 4096);
+            writer = new PrintWriter(out);
+            writers.add(writer);
+          }
+          if (userLogDir != null) {
+            // create output stream for logging 
+            // in hadoop.job.history.user.location
+            fs = new Path(userLogDir).getFileSystem(jobConf);
+            out = fs.create(new Path(userLogFile), true, 4096);
+            writer = new PrintWriter(out);
+            writers.add(writer);
+          }
+
+          openJobs.put(jobUniqueString, writers);
+
+          //add to writer as well 
+          JobHistory.log(writers, RecordTypes.Job, 
                          new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF }, 
                          new String[]{jobId, jobName, user, 
                                       String.valueOf(submitTime) , jobConfPath}
@@ -374,7 +424,7 @@
           disableHistory = true; 
         }
       }
-      /* Storing the job conf on the local file system */
+      // Always store job conf on local file system 
       String localJobFilePath =  JobInfo.getLocalJobFilePath(jobId); 
       File localJobFile = new File(localJobFilePath);
       FileOutputStream jobOut = null;
@@ -393,10 +443,53 @@
             jobOut.close();
           } catch (IOException ie) {
             LOG.info("Failed to close the job configuration file " 
-                     + StringUtils.stringifyException(ie));
+                       + StringUtils.stringifyException(ie));
           }
         }
       }
+
+      /* Storing the job conf on the log dir */
+      Path jobFilePath = null;
+      if (LOG_DIR != null) {
+        jobFilePath = new Path(LOG_DIR + File.separator + 
+                               jobUniqueString + "_conf.xml");
+      }
+      Path userJobFilePath = null;
+      if (userLogDir != null) {
+        userJobFilePath = new Path(userLogDir + File.separator +
+                                   jobUniqueString + "_conf.xml");
+      }
+      FSDataOutputStream jobFileOut = null;
+      try {
+        if (LOG_DIR != null) {
+          fs = new Path(LOG_DIR).getFileSystem(jobConf);
+          if (!fs.exists(jobFilePath)) {
+            jobFileOut = fs.create(jobFilePath);
+            jobConf.write(jobFileOut);
+            jobFileOut.close();
+          }
+        } 
+        if (userLogDir != null) {
+          fs = new Path(userLogDir).getFileSystem(jobConf);
+          jobFileOut = fs.create(userJobFilePath);
+          jobConf.write(jobFileOut);
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Job conf for " + jobId + " stored at " 
+                    + jobFilePath + "and" + userJobFilePath );
+        }
+      } catch (IOException ioe) {
+        LOG.error("Failed to store job conf on the local filesystem ", ioe);
+      } finally {
+        if (jobFileOut != null) {
+          try {
+            jobFileOut.close();
+          } catch (IOException ie) {
+            LOG.info("Failed to close the job configuration file " 
+                     + StringUtils.stringifyException(ie));
+          }
+        }
+      } 
     }
     /**
      * Logs launch time of job. 
@@ -407,16 +500,9 @@
      */
     public static void logStarted(String jobId, long startTime, int totalMaps, int totalReduces){
       if (!disableHistory){
-        synchronized(MASTER_INDEX_LOG_FILE){
-          JobHistory.log(masterIndex, RecordTypes.Job, 
-                         new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
-                         new String[] {jobId,  String.valueOf(startTime), 
-                                       String.valueOf(totalMaps), String.valueOf(totalReduces) }); 
-        }
-        
-        String logFileName =  JOBTRACKER_START_TIME + "_" + jobId; 
-        PrintWriter writer = openJobs.get(logFileName); 
-        
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
                          new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
@@ -439,20 +525,10 @@
                                    int failedMaps, int failedReduces,
                                    Counters counters){
       if (!disableHistory){
-        synchronized(MASTER_INDEX_LOG_FILE){
-          JobHistory.log(masterIndex, RecordTypes.Job,          
-                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 
-                                     Keys.JOB_STATUS, Keys.FINISHED_MAPS, 
-                                     Keys.FINISHED_REDUCES},
-                         new String[] {jobId,  "" + finishTime, 
-                                       Values.SUCCESS.name(), 
-                                       String.valueOf(finishedMaps), 
-                                       String.valueOf(finishedReduces)});
-        }
-        
         // close job file for this job
-        String logFileName = JOBTRACKER_START_TIME + "_" + jobId; 
-        PrintWriter writer = openJobs.get(logFileName); 
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,          
                          new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 
@@ -467,8 +543,10 @@
                                        String.valueOf(failedMaps), 
                                        String.valueOf(failedReduces),
                                        counters.makeCompactString()});
-          writer.close();
-          openJobs.remove(logFileName); 
+          for (PrintWriter out : writer) {
+            out.close();
+          }
+          openJobs.remove(logFileKey); 
         }
         Thread historyCleaner  = new Thread(new HistoryCleaner());
         historyCleaner.start(); 
@@ -483,21 +561,18 @@
      */
     public static void logFailed(String jobid, long timestamp, int finishedMaps, int finishedReduces){
       if (!disableHistory){
-        synchronized(MASTER_INDEX_LOG_FILE){
-          JobHistory.log(masterIndex, RecordTypes.Job,
-                         new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
-                         new String[] {jobid,  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 
-                                       String.valueOf(finishedReduces)}); 
-        }
-        String logFileName =  JOBTRACKER_START_TIME + "_" + jobid; 
-        PrintWriter writer = openJobs.get(logFileName); 
+        String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
+        ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
                          new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
                          new String[] {jobid,  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 
                                        String.valueOf(finishedReduces)}); 
-          writer.close();
-          openJobs.remove(logFileName); 
+          for (PrintWriter out : writer) {
+            out.close();
+          }
+          openJobs.remove(logFileKey); 
         }
       }
     }
@@ -520,7 +595,9 @@
     public static void logStarted(String jobId, String taskId, String taskType, 
                                   long startTime){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); 
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME}, 
@@ -538,7 +615,9 @@
     public static void logFinished(String jobId, String taskId, String taskType, 
                                    long finishTime, Counters counters){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); 
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
@@ -560,7 +639,9 @@
      */
     public static void logFailed(String jobId, String taskId, String taskType, long time, String error){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId); 
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
@@ -597,7 +678,9 @@
      */
     public static void logStarted(String jobId, String taskId, String taskAttemptId, long startTime, String hostName){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
@@ -620,7 +703,9 @@
                                    String taskAttemptId, long finishTime, 
                                    String hostName){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
@@ -644,7 +729,9 @@
     public static void logFailed(String jobId, String taskId, String taskAttemptId, 
                                  long timestamp, String hostName, String error){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
@@ -666,7 +753,9 @@
     public static void logKilled(String jobId, String taskId, String taskAttemptId, 
                                  long timestamp, String hostName, String error){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
@@ -693,7 +782,9 @@
     public static void logStarted(String jobId, String taskId, String taskAttemptId, 
                                   long startTime, String hostName){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
@@ -718,7 +809,9 @@
                                    long sortFinished, long finishTime, 
                                    String hostName){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
@@ -743,7 +836,9 @@
     public static void logFailed(String jobId, String taskId, String taskAttemptId, long timestamp, 
                                  String hostName, String error){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
@@ -765,7 +860,9 @@
     public static void logKilled(String jobId, String taskId, String taskAttemptId, long timestamp, 
                                  String hostName, String error){
       if (!disableHistory){
-        PrintWriter writer = openJobs.get(JOBTRACKER_START_TIME + "_" + jobId);
+        ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
+                                                     + jobId); 
+
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
@@ -822,61 +919,6 @@
       }
       lastRan = now;  
       isRunning = true; 
-      // update master Index first
-      try{
-        File logFile = new File(
-                                LOG_DIR + File.separator + MASTER_INDEX_LOG_FILE); 
-        
-        synchronized(MASTER_INDEX_LOG_FILE){
-          Map<String, Map<String, JobHistory.JobInfo>> jobTrackersToJobs = 
-            DefaultJobHistoryParser.parseMasterIndex(logFile);
-          
-          // find job that started more than one month back and remove them
-          // for jobtracker instances which dont have a job in past one month 
-          // remove the jobtracker start timestamp as well.
-          Iterator<Map<String, JobHistory.JobInfo>> jobTrackerItr =
-            jobTrackersToJobs.values().iterator();
-          while (jobTrackerItr.hasNext()) {
-            Map<String, JobHistory.JobInfo> jobs = jobTrackerItr.next();
-            Iterator<Map.Entry<String, JobHistory.JobInfo>> jobItr = 
-                   jobs.entrySet().iterator();
-            while (jobItr.hasNext()) {
-              Map.Entry<String, JobHistory.JobInfo> item = jobItr.next();
-              if (now - item.getValue().getLong(Keys.SUBMIT_TIME) > 
-                  THIRTY_DAYS_IN_MS) {
-                jobItr.remove(); 
-              }
-            }
-            if (jobs.size() == 0){
-              jobTrackerItr.remove(); 
-            }
-          }
-          masterIndex.close(); 
-          masterIndex = new PrintWriter(logFile);
-          // delete old history and write back to a new file
-          for (Map.Entry<String, Map<String, JobHistory.JobInfo>> entry :
-                  jobTrackersToJobs.entrySet()) {
-            String jobTrackerId = entry.getKey();
-            Map<String, JobHistory.JobInfo> jobs = entry.getValue();
-
-            
-            log(masterIndex, RecordTypes.Jobtracker, Keys.START_TIME, jobTrackerId);
-
-            for(JobHistory.JobInfo job : jobs.values()){
-              Map<Keys, String> values = job.getValues();
-              
-              log(masterIndex, RecordTypes.Job, 
-                  values.keySet().toArray(new Keys[0]), 
-                  values.values().toArray(new String[0])); 
-
-            }
-            masterIndex.flush();
-          }
-        }
-      }catch(IOException e){
-        LOG.error("Failed loading history log for cleanup", e);
-      }
-      
       File[] oldFiles = new File(LOG_DIR).listFiles(new FileFilter(){
           public boolean accept(File file){
             // delete if older than 30 days

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Feb 19 22:04:44 2008
@@ -593,6 +593,7 @@
 
   // Used to provide an HTML view on Job, Task, and TaskTracker structures
   StatusHttpServer infoServer;
+  StatusHttpServer historyServer;
   int infoPort;
 
   Server interTrackerServer;
@@ -701,6 +702,26 @@
       Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
     }
 
+    // start history viewing server.
+    JobHistory.init(conf, this.localMachine); 
+    String histAddr = conf.get("mapred.job.history.http.bindAddress",
+                                  "0.0.0.0:0");
+    InetSocketAddress historySocAddr = NetUtils.createSocketAddr(histAddr);
+    String historyBindAddress = historySocAddr.getHostName();
+    int tmpHistoryPort = historySocAddr.getPort();
+    historyServer = new StatusHttpServer("history", historyBindAddress, 
+                       tmpHistoryPort, tmpHistoryPort == 0);
+    String historyLogDir = conf.get("hadoop.job.history.location");
+    historyServer.setAttribute("historyLogDir", historyLogDir);
+    FileSystem fileSys = new Path(historyLogDir).getFileSystem(conf);
+    historyServer.setAttribute("fileSys", fileSys);
+    historyServer.start();
+    this.conf.set("mapred.job.history.http.bindAddress", 
+                (this.localMachine + ":" + historyServer.getPort()));
+    LOG.info("JobHistory webserver on JobTracker up at: " +
+              historyServer.getPort());
+
+
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
     synchronized (this) {
@@ -719,6 +740,9 @@
     return NetUtils.createSocketAddr(jobTrackerStr);
   }
 
+  public String getHistoryAddress() {
+    return conf.get("mapred.job.history.http.bindAddress");
+  }
 
   /**
    * Run forever
@@ -750,6 +774,14 @@
       LOG.info("Stopping infoServer");
       try {
         this.infoServer.stop();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
+    if (this.historyServer != null) {
+      LOG.info("Stopping historyServer");
+      try {
+        this.historyServer.stop();
       } catch (InterruptedException ex) {
         ex.printStackTrace();
       }

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputLogFilter.java Tue Feb 19 22:04:44 2008
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * This class filters log files from directory given
+ * It doesnt accept paths having _logs.
+ * This can be used to list paths of output directory as follows:
+ *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+ *                                   new OutputLogFilter()));
+ */
+public class OutputLogFilter implements PathFilter {
+  public boolean accept(Path path) {
+    return !(path.toString().contains("_logs"));
+  }
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestClusterMapReduceTestCase.java Tue Feb 19 22:04:44 2008
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -61,8 +62,9 @@
 
     JobClient.runJob(conf);
 
-    Path[] outputFiles = getFileSystem().listPaths(getOutputDir());
-
+    Path[] outputFiles = FileUtil.stat2Paths(
+                           getFileSystem().listStatus(getOutputDir(),
+                           new OutputLogFilter()));
     if (outputFiles.length > 0) {
       InputStream is = getFileSystem().open(outputFiles[0]);
       BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Tue Feb 19 22:04:44 2008
@@ -24,6 +24,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -76,7 +77,8 @@
     StringBuffer result = new StringBuffer();
     {
       Path[] parents = fs.listPaths(outDir.getParent());
-      Path[] fileList = fs.listPaths(outDir);
+      Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+              new OutputLogFilter()));
       for(int i=0; i < fileList.length; ++i) {
         BufferedReader file = 
           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
@@ -130,8 +132,8 @@
     conf.setJar("build/test/testjar/testjob.jar");
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
-
-    Path[] fileList = fs.listPaths(outDir);
+    Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+                                 new OutputLogFilter()));
     for (int i = 0; i < fileList.length; ++i) {
       BufferedReader file = new BufferedReader(new InputStreamReader(
                                                                      fs.open(fileList[i])));

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Tue Feb 19 22:04:44 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -91,8 +92,11 @@
     FileSystem fs = outDir.getFileSystem(conf);
     StringBuffer result = new StringBuffer();
     {
-      Path[] fileList = fs.listPaths(outDir);
+      
+      Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
+                                   new OutputLogFilter()));
       for(int i=0; i < fileList.length; ++i) {
+        LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
         BufferedReader file = 
           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
         String line = file.readLine();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/pipes/TestPipes.java Tue Feb 19 22:04:44 2008
@@ -32,6 +32,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.OutputLogFilter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TestMiniMRWithDFS;
 import org.apache.hadoop.util.StringUtils;
@@ -144,7 +145,8 @@
     RunningJob result = Submitter.submitJob(job);
     assertTrue("pipes job failed", result.isSuccessful());
     List<String> results = new ArrayList<String>();
-    for (Path p:fs.listPaths(outputPath)) {
+    for (Path p:FileUtil.stat2Paths(fs.listStatus(outputPath,
+    		                        new OutputLogFilter()))) {
       results.add(TestMiniMRWithDFS.readOutput(p, job));
     }
     assertEquals("number of reduces is wrong", 

Added: hadoop/core/trunk/src/webapps/history/analysejobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/analysejobhistory.jsp?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/analysejobhistory.jsp (added)
+++ hadoop/core/trunk/src/webapps/history/analysejobhistory.jsp Tue Feb 19 22:04:44 2008
@@ -0,0 +1,247 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.util.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.util.*"
+  import="java.text.SimpleDateFormat"
+  import="org.apache.hadoop.mapred.JobHistory.*"
+%>
+<jsp:include page="loadhistory.jsp">
+	<jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
+	<jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
+</jsp:include>
+<%!	private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ; %>
+<html><body>
+<%
+	String jobid = request.getParameter("jobid");
+	String logFile = request.getParameter("logFile");
+	String numTasks = request.getParameter("numTasks");
+	int showTasks = 10 ; 
+	if( numTasks != null ) {
+	  showTasks = Integer.parseInt(numTasks);  
+	}
+
+	JobInfo job = (JobInfo)request.getSession().getAttribute("job");
+
+%>
+<h2>Hadoop Job <a href="jobdetailshistory.jsp?jobid=<%=jobid%>&&logFile=<%=logFile %>"><%=jobid %> </a></h2>
+
+<b>User : </b> <%=job.get(Keys.USER) %><br/> 
+<b>JobName : </b> <%=job.get(Keys.JOBNAME) %><br/> 
+<b>JobConf : </b> <%=job.get(Keys.JOBCONF) %><br/> 
+<b>Submitted At : </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.SUBMIT_TIME), 0 ) %><br/> 
+<b>Launched At : </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.LAUNCH_TIME), job.getLong(Keys.SUBMIT_TIME)) %><br/>
+<b>Finished At : </b>  <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.FINISH_TIME), job.getLong(Keys.LAUNCH_TIME)) %><br/>
+<b>Status : </b> <%= ((job.get(Keys.JOB_STATUS) == null)?"Incomplete" :job.get(Keys.JOB_STATUS)) %><br/> 
+<hr/>
+<center>
+<%
+	if( ! Values.SUCCESS.name().equals(job.get(Keys.JOB_STATUS)) ){
+	  out.print("<h3>No Analysis available as job did not finish</h3>");
+	  return ;
+	}
+	Map<String, JobHistory.Task> tasks = job.getAllTasks();
+	int finishedMaps = job.getInt(Keys.FINISHED_MAPS)  ;
+	int finishedReduces = job.getInt(Keys.FINISHED_REDUCES) ;
+	JobHistory.Task [] mapTasks = new JobHistory.Task[finishedMaps]; 
+	JobHistory.Task [] reduceTasks = new JobHistory.Task[finishedReduces]; 
+	int mapIndex = 0 , reduceIndex=0; 
+	long avgMapTime = 0;
+	long avgReduceTime = 0;
+	long avgShuffleTime = 0;
+	
+	for( JobHistory.Task task : tasks.values() ) {
+	  long avgFinishTime = (task.getLong(Keys.FINISH_TIME) - 
+                      task.getLong(Keys.START_TIME));
+	  if( Values.MAP.name().equals(task.get(Keys.TASK_TYPE)) ){
+		  mapTasks[mapIndex++] = task ; 
+		  avgMapTime += avgFinishTime;
+	  }else{ 
+	    Map<String, TaskAttempt> attempts = task.getTaskAttempts();
+	    for (JobHistory.TaskAttempt attempt : attempts.values()) {
+	      if (attempt.get(Keys.TASK_STATUS).equals(Values.SUCCESS.name())) {
+	        reduceTasks[reduceIndex++] = attempt;
+   	        avgShuffleTime += (attempt.getLong(Keys.SHUFFLE_FINISHED) - 
+                               attempt.getLong(Keys.START_TIME));
+   	        avgReduceTime += (attempt.getLong(Keys.FINISH_TIME) - 
+                              attempt.getLong(Keys.SHUFFLE_FINISHED));
+   	        break;
+	      }
+	    }
+	  }
+	}
+	 
+	if (finishedMaps > 0) {
+	  avgMapTime /= finishedMaps;
+	}
+	if (finishedReduces > 0) {
+	  avgReduceTime /= finishedReduces;
+	  avgShuffleTime /= finishedReduces;
+	}
+	Comparator<JobHistory.Task> cMap = new Comparator<JobHistory.Task>(){
+	  public int compare(JobHistory.Task t1, JobHistory.Task t2){
+	    long l1 = t1.getLong(Keys.FINISH_TIME) - t1.getLong(Keys.START_TIME); 
+	    long l2 = t2.getLong(Keys.FINISH_TIME) - t2.getLong(Keys.START_TIME);
+      return (l2<l1 ? -1 : (l2==l1 ? 0 : 1));
+	  }
+	}; 
+	Comparator<JobHistory.Task> cShuffle = new Comparator<JobHistory.Task>(){
+	  public int compare(JobHistory.Task t1, JobHistory.Task t2){
+	    long l1 = t1.getLong(Keys.SHUFFLE_FINISHED) - 
+	                       t1.getLong(Keys.START_TIME); 
+	    long l2 = t2.getLong(Keys.SHUFFLE_FINISHED) - 
+	                       t2.getLong(Keys.START_TIME);
+      return (l2<l1 ? -1 : (l2==l1 ? 0 : 1));
+	  }
+	}; 
+	Arrays.sort(mapTasks, cMap);
+	Arrays.sort(reduceTasks, cShuffle); 
+	
+	JobHistory.Task minMap = mapTasks[mapTasks.length-1] ;
+	JobHistory.Task minShuffle = reduceTasks[reduceTasks.length-1] ;
+	
+%>
+
+<h3>Time taken by best performing Map task 
+<a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>&taskid=<%=minMap.get(Keys.TASKID)%>">
+<%=minMap.get(Keys.TASKID) %></a> : <%=StringUtils.formatTimeDiff(minMap.getLong(Keys.FINISH_TIME), minMap.getLong(Keys.START_TIME) ) %></h3>
+<h3>Average time taken by Map tasks: 
+<%=StringUtils.formatTimeDiff(avgMapTime, 0) %></h3>
+<h3>Worse performing map tasks</h3>
+<table border="2" cellpadding="5" cellspacing="2">
+<tr><td>Task Id</td><td>Time taken</td></tr>
+<%
+	for( int i=0;i<showTasks && i<mapTasks.length; i++){
+%>
+		<tr>
+			<td><a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>&taskid=<%=mapTasks[i].get(Keys.TASKID)%>">
+  		    <%=mapTasks[i].get(Keys.TASKID) %></a></td>
+			<td><%=StringUtils.formatTimeDiff(mapTasks[i].getLong(Keys.FINISH_TIME), mapTasks[i].getLong(Keys.START_TIME)) %></td>
+		</tr>
+<%
+	}
+%>
+</table>
+<%  
+    Comparator<JobHistory.Task> cFinishMapRed = 
+      new Comparator<JobHistory.Task>() {
+      public int compare(JobHistory.Task t1, JobHistory.Task t2){
+        long l1 = t1.getLong(Keys.FINISH_TIME); 
+        long l2 = t2.getLong(Keys.FINISH_TIME);
+        return (l2<l1 ? -1 : (l2==l1 ? 0 : 1));
+      }
+    };
+    Arrays.sort(mapTasks, cFinishMapRed);
+    JobHistory.Task lastMap = mapTasks[0] ;
+%>
+<h3>The last Map task 
+<a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>
+&taskid=<%=lastMap.get(Keys.TASKID)%>"><%=lastMap.get(Keys.TASKID) %></a> 
+finished at (relative to the Job launch time): 
+<%=StringUtils.getFormattedTimeWithDiff(dateFormat, 
+                              lastMap.getLong(Keys.FINISH_TIME), 
+                              job.getLong(Keys.LAUNCH_TIME) ) %></h3>
+<hr/>
+<h3>Time taken by best performing shuffle
+<a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>
+&taskid=<%=minShuffle.get(Keys.TASKID)%>"><%=minShuffle.get(Keys.TASKID)%></a> : 
+<%=StringUtils.formatTimeDiff(minShuffle.getLong(Keys.SHUFFLE_FINISHED), 
+                              minShuffle.getLong(Keys.START_TIME) ) %></h3>
+<h3>Average time taken by Shuffle: 
+<%=StringUtils.formatTimeDiff(avgShuffleTime, 0) %></h3>
+<h3>Worse performing Shuffle(s)</h3>
+<table border="2" cellpadding="5" cellspacing="2">
+<tr><td>Task Id</td><td>Time taken</td></tr>
+<%
+	for( int i=0;i<showTasks && i<reduceTasks.length; i++){
+%>
+      <tr>
+	    <td><a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=
+	    <%=logFile%>&taskid=<%=reduceTasks[i].get(Keys.TASKID)%>">
+			<%=reduceTasks[i].get(Keys.TASKID) %></a></td>
+	    <td><%=
+	      StringUtils.formatTimeDiff(
+	                       reduceTasks[i].getLong(Keys.SHUFFLE_FINISHED),
+	                       reduceTasks[i].getLong(Keys.START_TIME)) %>
+	    </td>
+	  </tr>
+<%
+	}
+%>
+</table>
+<%  
+    Comparator<JobHistory.Task> cFinishShuffle = 
+      new Comparator<JobHistory.Task>() {
+      public int compare(JobHistory.Task t1, JobHistory.Task t2){
+        long l1 = t1.getLong(Keys.SHUFFLE_FINISHED); 
+        long l2 = t2.getLong(Keys.SHUFFLE_FINISHED);
+        return (l2<l1 ? -1 : (l2==l1 ? 0 : 1));
+      }
+    };
+    Arrays.sort(reduceTasks, cFinishShuffle);
+    JobHistory.Task lastShuffle = reduceTasks[0] ;
+%>
+
+<h3>The last Shuffle  
+<a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>
+&taskid=<%=lastShuffle.get(Keys.TASKID)%>"><%=lastShuffle.get(Keys.TASKID)%>
+</a> finished at (relative to the Job launch time): 
+<%=StringUtils.getFormattedTimeWithDiff(dateFormat,
+                              lastShuffle.getLong(Keys.SHUFFLE_FINISHED), 
+                              job.getLong(Keys.LAUNCH_TIME) ) %></h3>
+
+<%
+	Comparator<JobHistory.Task> cReduce = new Comparator<JobHistory.Task>(){
+	  public int compare(JobHistory.Task t1, JobHistory.Task t2){
+	    long l1 = t1.getLong(Keys.FINISH_TIME) - 
+	                       t1.getLong(Keys.SHUFFLE_FINISHED); 
+	    long l2 = t2.getLong(Keys.FINISH_TIME) - 
+	                       t2.getLong(Keys.SHUFFLE_FINISHED);
+      return (l2<l1 ? -1 : (l2==l1 ? 0 : 1));
+	  }
+	}; 
+	Arrays.sort(reduceTasks, cReduce); 
+	JobHistory.Task minReduce = reduceTasks[reduceTasks.length-1] ;
+%>
+<hr/>
+<h3>Time taken by best performing Reduce task : 
+<a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>&taskid=<%=minReduce.get(Keys.TASKID)%>">
+<%=minReduce.get(Keys.TASKID) %></a> : 
+<%=StringUtils.formatTimeDiff(minReduce.getLong(Keys.FINISH_TIME),
+    minReduce.getLong(Keys.SHUFFLE_FINISHED) ) %></h3>
+
+<h3>Average time taken by Reduce tasks: 
+<%=StringUtils.formatTimeDiff(avgReduceTime, 0) %></h3>
+<h3>Worse performing reduce tasks</h3>
+<table border="2" cellpadding="5" cellspacing="2">
+<tr><td>Task Id</td><td>Time taken</td></tr>
+<%
+	for( int i=0;i<showTasks && i<reduceTasks.length; i++){
+%>
+		<tr>
+			<td><a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>&taskid=<%=reduceTasks[i].get(Keys.TASKID)%>">
+			<%=reduceTasks[i].get(Keys.TASKID) %></a></td>
+			<td><%=StringUtils.formatTimeDiff(
+			    reduceTasks[i].getLong(Keys.FINISH_TIME), 
+			    reduceTasks[i].getLong(Keys.SHUFFLE_FINISHED)) %></td>
+		</tr>
+<%
+	}
+%>
+</table>
+<%  
+    Arrays.sort(reduceTasks, cFinishMapRed);
+    JobHistory.Task lastReduce = reduceTasks[0] ;
+%>
+
+<h3>The last Reduce task 
+<a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile%>
+&taskid=<%=lastReduce.get(Keys.TASKID)%>"><%=lastReduce.get(Keys.TASKID)%>
+</a> finished at (relative to the Job launch time): 
+<%=StringUtils.getFormattedTimeWithDiff(dateFormat,
+                              lastReduce.getLong(Keys.FINISH_TIME), 
+                              job.getLong(Keys.LAUNCH_TIME) ) %></h3>
+ </center>
+ </body></html>
\ No newline at end of file

Added: hadoop/core/trunk/src/webapps/history/index.html
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/index.html?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/index.html (added)
+++ hadoop/core/trunk/src/webapps/history/index.html Tue Feb 19 22:04:44 2008
@@ -0,0 +1,20 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=jobhistory.jsp"/>
+<html>
+
+<head>
+<title>Hadoop Administration - History</title>
+</head>
+
+<body>
+
+<h1>Hadoop Administration - History</h1>
+
+<ul>
+
+<li><a href="jobhistory.jsp">Job History</a></li>
+
+</ul>
+
+</body>
+
+</html>

Added: hadoop/core/trunk/src/webapps/history/jobconf_history.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/jobconf_history.jsp?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/jobconf_history.jsp (added)
+++ hadoop/core/trunk/src/webapps/history/jobconf_history.jsp Tue Feb 19 22:04:44 2008
@@ -0,0 +1,58 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.net.URL"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.fs.*"
+  import="org.apache.hadoop.util.*"
+%>
+
+
+<%
+  JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+  String jobId = request.getParameter("jobid");
+  if (jobId == null) {
+    out.println("<h2>Missing 'jobid' for fetching job configuration!</h2>");
+ 	return;
+  }
+%>
+  
+<html>
+
+<title>Job Configuration: JobId - <%= jobId %></title>
+
+<body>
+<h2>Job Configuration: JobId - <%= jobId %></h2><br>
+
+<%
+  Path logDir = new Path(request.getParameter("jobLogDir"));
+  Path jobFilePath = new Path(logDir, 
+                       request.getParameter("jobUniqueString") + "_conf.xml");
+  FileSystem fs = (FileSystem)request.getSession().getAttribute("fs");
+  FSDataInputStream jobFile = null; 
+  try {
+    jobFile = fs.open(jobFilePath);
+    JobConf jobConf = new JobConf(jobFilePath);
+    XMLUtils.transform(
+        jobConf.getConfResourceAsInputStream("webapps/static/jobconf.xsl"),
+        jobFile, out);
+  } catch (Exception e) {
+    out.println("Failed to retreive job configuration for job '" + jobId + "!");
+    out.println(e);
+  } finally {
+    if (jobFile != null) {
+      try { 
+        jobFile.close(); 
+      } catch (IOException e) {}
+    }
+  } 
+%>
+
+<br>
+<hr>
+<a href="http://lucene.apache.org/hadoop">Hadoop</a>, 2007.<br>
+
+</body>
+</html>

Added: hadoop/core/trunk/src/webapps/history/jobdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/jobdetailshistory.jsp?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/jobdetailshistory.jsp (added)
+++ hadoop/core/trunk/src/webapps/history/jobdetailshistory.jsp Tue Feb 19 22:04:44 2008
@@ -0,0 +1,201 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.util.*"
+  import="org.apache.hadoop.fs.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.util.*"
+  import="java.text.SimpleDateFormat"
+  import="org.apache.hadoop.mapred.JobHistory.*"
+%>
+<jsp:include page="loadhistory.jsp">
+	<jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
+	<jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
+</jsp:include>
+<%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %>
+<%
+	String jobid = request.getParameter("jobid");
+	String logFile = request.getParameter("logFile");
+	
+	Path jobFile = new Path(logFile);
+	String[] jobDetails = jobFile.getName().split("_");
+    String jobUniqueString = jobDetails[0] + "_" +jobDetails[1] + "_" + jobid ;
+	
+	JobInfo job = (JobInfo)request.getSession().getAttribute("job");
+	FileSystem fs = (FileSystem)request.getSession().getAttribute("fs");
+%>
+<html><body>
+<h2>Hadoop Job <%=jobid %> on <a href="jobhistory.jsp">History Viewer</a></h2>
+
+<b>User: </b> <%=job.get(Keys.USER) %><br/> 
+<b>JobName: </b> <%=job.get(Keys.JOBNAME) %><br/> 
+<b>JobConf: </b> <a href="jobconf_history.jsp?jobid=<%=jobid%>&jobLogDir=<%=new Path(logFile).getParent().toString()%>&jobUniqueString=<%=jobUniqueString%>"> 
+                 <%=job.get(Keys.JOBCONF) %></a><br/> 
+<b>Submitted At: </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.SUBMIT_TIME), 0 )  %><br/> 
+<b>Launched At: </b> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.LAUNCH_TIME), job.getLong(Keys.SUBMIT_TIME)) %><br/>
+<b>Finished At: </b>  <%=StringUtils.getFormattedTimeWithDiff(dateFormat, job.getLong(Keys.FINISH_TIME), job.getLong(Keys.LAUNCH_TIME)) %><br/>
+<b>Status: </b> <%= ((job.get(Keys.JOB_STATUS) == "")?"Incomplete" :job.get(Keys.JOB_STATUS)) %><br/> 
+<%
+	Map<String, JobHistory.Task> tasks = job.getAllTasks();
+	int totalMaps = 0 ; 
+	int totalReduces = 0; 
+	int failedMaps = 0; 
+	int killedMaps = 0;
+	int failedReduces = 0 ; 
+	int killedReduces = 0;
+	
+	long mapStarted = 0 ; 
+	long mapFinished = 0 ; 
+	long reduceStarted = 0 ; 
+	long reduceFinished = 0; 
+        
+        Map <String,String> allHosts = new TreeMap<String,String>();
+	
+	for( JobHistory.Task task : tasks.values() ) {
+	  
+	  long startTime = task.getLong(Keys.START_TIME) ; 
+	  long finishTime = task.getLong(Keys.FINISH_TIME) ; 
+	  
+          allHosts.put(task.get(Keys.HOSTNAME), "");
+
+	  if( Values.MAP.name().equals(task.get(Keys.TASK_TYPE)) ){
+	    if( mapStarted==0 || mapStarted > startTime ){
+	      mapStarted = startTime; 
+	    }
+	    if(  mapFinished < finishTime ){
+	      mapFinished = finishTime ; 
+	    }
+	    
+	    Map<String, TaskAttempt> attempts = task.getTaskAttempts();
+	    for( TaskAttempt attempt : attempts.values() ) {
+	        totalMaps++; 
+	        if( Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
+	            failedMaps++; 
+	        }
+	        if( Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
+	            killedMaps++; 
+	        }
+	    }
+	  }else{
+	    if( reduceStarted==0||reduceStarted > startTime ){
+	      reduceStarted = startTime ; 
+	    }
+	    if(  reduceFinished < finishTime ){
+	      reduceFinished = finishTime; 
+	    }
+	    Map<String, TaskAttempt> attempts = task.getTaskAttempts();
+	    for( TaskAttempt attempt : attempts.values() ) {
+	        totalReduces++; 
+	        if( Values.FAILED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
+	            failedReduces++; 
+	        }
+	        if( Values.KILLED.name().equals(attempt.get(Keys.TASK_STATUS)) ) {
+	            killedReduces++; 
+	        }
+	    }
+	  }
+	}
+%>
+<b><a href="analysejobhistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>">Analyse This Job</a></b> 
+<hr/>
+<center>
+<table border="2" cellpadding="5" cellspacing="2">
+<tr>
+<td>Kind</td><td>Total Tasks(successful+failed+killed)</td><td>Successful tasks</td><td>Failed tasks</td><td>Killed tasks</td><td>Start Time</td><td>Finish Time</td>
+</tr>
+<tr>
+<td>Map</td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.MAP.name() %>&status=all">
+	  <%=totalMaps %></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.MAP.name() %>&status=<%=Values.SUCCESS %>">
+	  <%=job.getInt(Keys.FINISHED_MAPS) %></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.MAP.name() %>&status=<%=Values.FAILED %>">
+	  <%=failedMaps %></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.MAP.name() %>&status=<%=Values.KILLED %>">
+	  <%=killedMaps %></a></td>
+	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, mapStarted, 0) %></td>
+	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, mapFinished, mapStarted) %></td>
+</tr>
+<tr>
+<td>Reduce</td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.REDUCE.name() %>&status=all">
+	  <%=totalReduces%></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.REDUCE.name() %>&status=<%=Values.SUCCESS %>">
+	  <%=job.getInt(Keys.FINISHED_REDUCES)%></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.REDUCE.name() %>&status=<%=Values.FAILED %>">
+	  <%=failedReduces%></a></td>
+	<td><a href="jobtaskshistory.jsp?jobid=<%=jobid %>&logFile=<%=logFile %>&taskType=<%=Values.REDUCE.name() %>&status=<%=Values.KILLED %>">
+	  <%=killedReduces%></a></td>  
+	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, reduceStarted, 0) %></td>
+	<td><%=StringUtils.getFormattedTimeWithDiff(dateFormat, reduceFinished, reduceStarted) %></td>
+</tr>
+ </table>
+
+<br/>
+ <%
+	DefaultJobHistoryParser.FailedOnNodesFilter filter = new DefaultJobHistoryParser.FailedOnNodesFilter();
+	JobHistory.parseHistoryFromFS(logFile, filter, fs); 
+	Map<String, Set<String>> badNodes = filter.getValues(); 
+	if( badNodes.size() > 0 ) {
+ %>
+<h3>Failed tasks attempts by nodes </h3>
+<table border="1">
+<tr><td>Hostname</td><td>Failed Tasks</td></tr>
+ <%	  
+  for (Map.Entry<String, Set<String>> entry : badNodes.entrySet()) {
+    String node = entry.getKey();
+    Set<String> failedTasks = entry.getValue();
+%>
+	<tr>
+		<td><%=node %></td>
+		<td>
+<%
+		for( String t : failedTasks ) {
+%>
+		 <a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile %>&taskid=<%=t %>"><%=t %></a>,&nbsp;
+<%		  
+		}
+%>	
+		</td>
+	</tr>
+<%	  
+     }
+	}
+ %>
+</table>
+<br/>
+ <%
+	DefaultJobHistoryParser.KilledOnNodesFilter killedFilter = new DefaultJobHistoryParser.KilledOnNodesFilter();
+	JobHistory.parseHistoryFromFS(logFile, filter, fs); 
+	badNodes = killedFilter.getValues(); 
+	if( badNodes.size() > 0 ) {
+ %>
+<h3>Killed tasks attempts by nodes </h3>
+<table border="1">
+<tr><td>Hostname</td><td>Killed Tasks</td></tr>
+ <%	  
+  for (Map.Entry<String, Set<String>> entry : badNodes.entrySet()) {
+    String node = entry.getKey();
+    Set<String> killedTasks = entry.getValue();
+%>
+	<tr>
+		<td><%=node %></td>
+		<td>
+<%
+		for( String t : killedTasks ) {
+%>
+		 <a href="taskdetailshistory.jsp?jobid=<%=jobid%>&logFile=<%=logFile %>&taskid=<%=t %>"><%=t %></a>,&nbsp;
+<%		  
+		}
+%>	
+		</td>
+	</tr>
+<%	  
+     }
+	}
+ %>
+</table>
+ </center>
+
+</body></html>

Added: hadoop/core/trunk/src/webapps/history/jobhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/jobhistory.jsp?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/jobhistory.jsp (added)
+++ hadoop/core/trunk/src/webapps/history/jobhistory.jsp Tue Feb 19 22:04:44 2008
@@ -0,0 +1,104 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="java.io.*"
+  import="java.util.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.util.*"
+  import="org.apache.hadoop.fs.*"
+  import="javax.servlet.jsp.*"
+  import="java.text.SimpleDateFormat"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.mapred.JobHistory.*"
+%>
+<%!	
+	private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ;
+%>
+<html>
+<head>
+<title>Hadoop Map/Reduce Administration</title>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+</head>
+<body>
+<h1>Hadoop Map/Reduce History Viewer</h1>
+<hr>
+<h2>Available History </h2>
+<%
+    PathFilter jobLogFileFilter = new PathFilter() {
+      public boolean accept(Path path) {
+        return !(path.getName().endsWith(".xml"));
+      }
+    };
+    
+	FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+	String historyLogDir = (String) application.getAttribute("historyLogDir");
+	Path[] jobFiles = fs.listPaths(new Path(historyLogDir), jobLogFileFilter);
+
+    // sort the files on creation time.
+    Arrays.sort(jobFiles, new Comparator<Path>() {
+      public int compare(Path p1, Path p2) {
+        String[] split1 = p1.getName().split("_");
+        String[] split2 = p2.getName().split("_");
+        
+        // compare job tracker start time
+        int res = new Date(Long.parseLong(split1[1])).compareTo(
+                             new Date(Long.parseLong(split2[1])));
+        if (res == 0) {
+          res = new Date(Long.parseLong(split1[3])).compareTo(
+                           new Date(Long.parseLong(split2[3])));
+        }
+        if (res == 0) {
+          Long l1 = Long.parseLong(split1[4]);
+          res = l1.compareTo(Long.parseLong(split2[4]));
+        }
+        
+        return res;
+      }
+    });
+
+    if (null == jobFiles ){
+      out.println("NULL !!!"); 
+      return ; 
+    }
+       
+    out.print("<table align=center border=2 cellpadding=\"5\" cellspacing=\"2\">");
+    out.print("<tr><td align=\"center\" colspan=\"9\"><b>Available Jobs </b></td></tr>\n");
+    out.print("<tr>");
+    out.print("<td>Job tracker Host Name</td>" +
+              "<td>Job tracker Start time</td>" +
+              "<td>Job Id</td><td>Name</td><td>User</td>") ; 
+    out.print("</tr>"); 
+    for (Path jobFile: jobFiles) {
+      String[] jobDetails = jobFile.getName().split("_");
+      String trackerHostName = jobDetails[0];
+      String trackerStartTime = jobDetails[1];
+      String jobId = jobDetails[2] + "_" +jobDetails[3] + "_" + jobDetails[4] ;
+      String user = jobDetails[5];
+      String jobName = jobDetails[6];
+      
+%>
+<center>
+<%	
+
+	  printJob(trackerHostName, trackerStartTime, jobId,
+               jobName, user, jobFile.toString(), out) ; 
+%>
+</center> 
+<%
+	} // end while trackers 
+%>
+<%!
+	private void printJob(String trackerHostName, String trackerid,
+                          String jobId, String jobName,
+                          String user, String logFile, JspWriter out)
+    throws IOException{
+	    out.print("<tr>"); 
+	    out.print("<td>" + trackerHostName + "</td>"); 
+	    out.print("<td>" + new Date(Long.parseLong(trackerid)) + "</td>"); 
+	    out.print("<td>" + "<a href=\"jobdetailshistory.jsp?jobid="+ jobId + 
+	        "&logFile=" + logFile +"\">" + jobId + "</a></td>"); 
+	    out.print("<td>" + jobName + "</td>"); 
+	    out.print("<td>" + user + "</td>"); 
+	    out.print("</tr>");
+	}
+ %> 
+</body></html>

Added: hadoop/core/trunk/src/webapps/history/jobtaskshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/jobtaskshistory.jsp?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/jobtaskshistory.jsp (added)
+++ hadoop/core/trunk/src/webapps/history/jobtaskshistory.jsp Tue Feb 19 22:04:44 2008
@@ -0,0 +1,62 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.util.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.util.*"
+  import="java.text.SimpleDateFormat"
+  import="org.apache.hadoop.mapred.JobHistory.*"
+%>
+<jsp:include page="loadhistory.jsp">
+	<jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
+	<jsp:param name="logFile" value="<%=request.getParameter("logFile") %>"/>
+</jsp:include>
+<%!	private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ; %>
+
+<%	
+	String jobid = request.getParameter("jobid");
+	String logFile = request.getParameter("logFile");
+	String taskStatus = request.getParameter("status"); 
+	String taskType = request.getParameter("taskType"); 
+	
+	JobHistory.JobInfo job = (JobHistory.JobInfo)request.getSession().getAttribute("job");
+	
+	Map<String, JobHistory.Task> tasks = job.getAllTasks(); 
+%>
+<html>
+<body>
+<h2><%=taskStatus%> <%=taskType %> task list for <a href="jobdetailshistory.jsp?jobid=<%=jobid%>&&logFile=<%=logFile %>"><%=jobid %> </a></h2>
+<center>
+<table border="2" cellpadding="5" cellspacing="2">
+<tr><td>Task Id</td><td>Start Time</td><td>Finish Time<br/></td><td>Error</td></tr>
+<%
+	for( JobHistory.Task task : tasks.values() ) {
+	  if( taskType.equals(task.get(Keys.TASK_TYPE) ) ){
+            Map <String, TaskAttempt> taskAttempts = task.getTaskAttempts();
+            for (JobHistory.TaskAttempt taskAttempt : taskAttempts.values()) {
+	      if( taskStatus.equals(taskAttempt.get(Keys.TASK_STATUS)) || taskStatus.equals("all")){
+	         printTask(jobid, logFile, task, out); 
+	      }
+            }
+	  }
+	}
+%>
+</table>
+<%!
+	private void printTask(String jobid, String trackerId, JobHistory.Task task, JspWriter out) throws IOException{
+  		out.print("<tr>"); 
+  		out.print("<td>" + "<a href=\"taskdetailshistory.jsp?jobid=" + jobid + 
+  		    "&logFile="+ trackerId +"&taskid="+task.get(Keys.TASKID)+"\">" + 
+  		    task.get(Keys.TASKID) + "</a></td>");
+  		out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, 
+  		    task.getLong(Keys.START_TIME), 0 ) + "</td>");
+		out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, 
+		    task.getLong(Keys.FINISH_TIME), task.getLong(Keys.START_TIME) ) + "</td>");
+  		out.print("<td>" + task.get(Keys.ERROR) + "</td>");
+  		out.print("</tr>"); 
+	}
+%>
+</center>
+</body>
+</html>

Added: hadoop/core/trunk/src/webapps/history/loadhistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/loadhistory.jsp?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/loadhistory.jsp (added)
+++ hadoop/core/trunk/src/webapps/history/loadhistory.jsp Tue Feb 19 22:04:44 2008
@@ -0,0 +1,35 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="java.io.*"
+  import="java.util.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.fs.*"
+  import="org.apache.hadoop.util.*"
+  import="javax.servlet.jsp.*"
+  import="java.text.SimpleDateFormat"  
+  import="org.apache.hadoop.mapred.JobHistory.*"
+%>
+<%
+    PathFilter jobLogFileFilter = new PathFilter() {
+      public boolean accept(Path path) {
+        return !(path.getName().endsWith(".xml"));
+      }
+    };
+    
+	FileSystem fs = (FileSystem) application.getAttribute("fileSys");
+	String jobId =  (String)request.getParameter("jobid");
+	JobHistory.JobInfo job = (JobHistory.JobInfo)request.getSession().getAttribute("job");
+	if (null != job && (!jobId.equals(job.get(Keys.JOBID)))){
+      // remove jobInfo from session, keep only one job in session at a time
+      request.getSession().removeAttribute("job"); 
+      job = null ; 
+    }
+	
+	if (null == job) {
+      String jobLogFile = (String)request.getParameter("logFile");
+      job = new JobHistory.JobInfo(jobId); 
+      DefaultJobHistoryParser.parseJobTasks(jobLogFile, job, fs) ; 
+      request.getSession().setAttribute("job", job);
+      request.getSession().setAttribute("fs", fs);
+	}
+%>

Added: hadoop/core/trunk/src/webapps/history/taskdetailshistory.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/history/taskdetailshistory.jsp?rev=629369&view=auto
==============================================================================
--- hadoop/core/trunk/src/webapps/history/taskdetailshistory.jsp (added)
+++ hadoop/core/trunk/src/webapps/history/taskdetailshistory.jsp Tue Feb 19 22:04:44 2008
@@ -0,0 +1,69 @@
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.util.*"
+  import="org.apache.hadoop.mapred.*"
+  import="org.apache.hadoop.util.*"
+  import="java.text.SimpleDateFormat"
+  import="org.apache.hadoop.mapred.JobHistory.*"
+%>
+<jsp:include page="loadhistory.jsp">
+	<jsp:param name="jobid" value="<%=request.getParameter("jobid") %>"/>
+	<jsp:param name="jobTrackerId" value="<%=request.getParameter("jobTrackerId") %>"/>
+</jsp:include>
+<%!	private static SimpleDateFormat dateFormat = new SimpleDateFormat("d/MM HH:mm:ss") ; %>
+
+<%	
+	String jobid = request.getParameter("jobid");
+	String logFile = request.getParameter("logFile");
+	String taskid = request.getParameter("taskid"); 
+
+	JobHistory.JobInfo job = (JobHistory.JobInfo)request.getSession().getAttribute("job");
+	JobHistory.Task task = job.getAllTasks().get(taskid); 
+%>
+<html>
+<body>
+<h2><%=taskid %> attempts for <a href="jobdetailshistory.jsp?jobid=<%=jobid%>&&logFile=<%=logFile %>"> <%=jobid %> </a></h2>
+<center>
+<table border="2" cellpadding="5" cellspacing="2">
+<tr><td>Task Id</td><td>Start Time</td>
+<%	
+	if( Values.REDUCE.name().equals(task.get(Keys.TASK_TYPE) ) ){
+%>
+		<td>Shuffle Finished</td><td>Sort Finished</td>
+<%
+	}
+%>
+<td>Finish Time</td><td>Host</td><td>Error</td></tr>
+<%
+	for( JobHistory.TaskAttempt attempt : task.getTaskAttempts().values() ) {
+	  printTaskAttempt(attempt, task.get(Keys.TASK_TYPE), out); 
+	}
+%>
+</table>
+<%!
+	private void printTaskAttempt(JobHistory.TaskAttempt taskAttempt, String type, JspWriter out) throws IOException{
+  		out.print("<tr>"); 
+  		out.print("<td>" + taskAttempt.get(Keys.TASK_ATTEMPT_ID) + "</td>");
+         out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, taskAttempt.getLong(Keys.START_TIME), 0 ) + "</td>") ; 
+  		if(Values.REDUCE.name().equals(type) ){
+  		  JobHistory.ReduceAttempt reduceAttempt = (JobHistory.ReduceAttempt)taskAttempt ; 
+	      out.print("<td>" + 
+	          StringUtils.getFormattedTimeWithDiff(dateFormat, 
+	              reduceAttempt.getLong(Keys.SHUFFLE_FINISHED), 
+	              reduceAttempt.getLong(Keys.START_TIME)) + "</td>") ; 
+	      out.print("<td>" + StringUtils.getFormattedTimeWithDiff(dateFormat, 
+	          	 reduceAttempt.getLong(Keys.SORT_FINISHED), 
+	          	 reduceAttempt.getLong(Keys.SHUFFLE_FINISHED)) + "</td>") ; 
+  		}
+	      out.print("<td>"+ StringUtils.getFormattedTimeWithDiff(dateFormat, taskAttempt.getLong(Keys.FINISH_TIME), 
+					          taskAttempt.getLong(Keys.START_TIME) ) + "</td>") ; 
+  		out.print("<td>" + taskAttempt.get(Keys.HOSTNAME) + "</td>");
+  		out.print("<td>" + taskAttempt.get(Keys.ERROR) + "</td>");
+  		out.print("</tr>"); 
+	}
+%>
+</center>
+</body>
+</html>

Modified: hadoop/core/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtracker.jsp?rev=629369&r1=629368&r2=629369&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobtracker.jsp Tue Feb 19 22:04:44 2008
@@ -134,7 +134,7 @@
 <hr>
 
 <h2>Local logs</h2>
-<a href="logs/">Log</a> directory, <a href="jobhistory.jsp?historyFile=JobHistory.log&reload=true">
+<a href="logs/">Log</a> directory, <a href="http://<%=tracker.getHistoryAddress()%>">
 Job Tracker History</a>
 
 <%



Mime
View raw message