hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1130185 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/...
Date Wed, 01 Jun 2011 15:24:12 GMT
Author: acmurthy
Date: Wed Jun  1 15:24:12 2011
New Revision: 1130185

URL: http://svn.apache.org/viewvc?rev=1130185&view=rev
Log:
MAPREDUCE-2551. Added JobSummaryLog. Contributed by Siddharth Seth.

Added:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobIdPBImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed Jun  1 15:24:12 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
     
+    MAPREDUCE-2551. Added JobSummaryLog. (Siddharth Seth via acmurthy)
+     
     Added ability to decommission nodes and completed RM administration tools
     to achieve parity with JobTracker. (acmurthy) 
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
Wed Jun  1 15:24:12 2011
@@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -44,7 +43,7 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -253,7 +252,7 @@ public class JobHistoryEventHandler exte
    * @param jobId the jobId.
    * @throws IOException
    */
-  protected void setupEventWriter(JobId jobId)
+  protected void setupEventWriter(JobId jobId, JobSubmittedEvent jse)
   throws IOException {
     if (logDirPath == null) {
       LOG.error("Log Directory is null, returning");
@@ -263,13 +262,17 @@ public class JobHistoryEventHandler exte
     MetaInfo oldFi = fileMap.get(jobId);
     Configuration conf = getConfig();
 
-    long submitTime = (oldFi == null ? context.getClock().getTime() : oldFi.getJobIndexInfo().getSubmitTime());
+    long submitTime = oldFi == null ? jse.getSubmitTime() : oldFi
+        .getJobIndexInfo().getSubmitTime();
     
-    //TODO Ideally this should be written out to the job dir (.staging/jobid/files - RecoveryService
will need to be patched)
-    Path logFile = JobHistoryUtils.getStagingJobHistoryFile(logDirPath, jobId, startCount);
-    String user = conf.get(MRJobConfig.USER_NAME);
+    // TODO Ideally this should be written out to the job dir
+    // (.staging/jobid/files - RecoveryService will need to be patched)
+    Path logFile = JobHistoryUtils.getStagingJobHistoryFile(logDirPath, jobId,
+        startCount);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
     if (user == null) {
-      throw new IOException("User is null while setting up jobhistory eventwriter" );
+      throw new IOException(
+          "User is null while setting up jobhistory eventwriter");
     }
     String jobName = TypeConverter.fromYarn(jobId).toString();
     EventWriter writer = (oldFi == null) ? null : oldFi.writer;
@@ -277,18 +280,22 @@ public class JobHistoryEventHandler exte
     if (writer == null) {
       try {
         FSDataOutputStream out = logDirFS.create(logFile, true);
-        //TODO Permissions for the history file?
         writer = new EventWriter(out);
-        LOG.info("Event Writer setup for JobId: " + jobId + ", File: " + logFile);
+        LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
+            + logFile);
       } catch (IOException ioe) {
-        LOG.info("Could not create log file: [" + logFile + "] + for job " + "[" + jobName
+ "]");
+        LOG.info("Could not create log file: [" + logFile + "] + for job "
+            + "[" + jobName + "]");
         throw ioe;
       }
     }
     
     Path logDirConfPath = null;
     if (conf != null) {
-      logDirConfPath = JobHistoryUtils.getStagingConfFile(logDirPath, jobId, startCount);
+      // TODO Ideally this should be written out to the job dir
+      // (.staging/jobid/files - RecoveryService will need to be patched)
+      logDirConfPath = JobHistoryUtils.getStagingConfFile(logDirPath, jobId,
+          startCount);
       FSDataOutputStream jobFileOut = null;
       try {
         if (logDirConfPath != null) {
@@ -297,12 +304,15 @@ public class JobHistoryEventHandler exte
           jobFileOut.close();
         }
       } catch (IOException e) {
-        LOG.info("Failed to close the job configuration file "
-            + StringUtils.stringifyException(e));
+        LOG.info("Failed to write the job configuration file", e);
+        throw e;
       }
     }
     
-    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime, user, jobName,
jobId);
+    MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer, submitTime,
+        user, jobName, jobId);
+    fi.getJobSummary().setJobId(jobId);
+    fi.getJobSummary().setJobSubmitTime(submitTime);
     fileMap.put(jobId, fi);
   }
 
@@ -331,14 +341,15 @@ public class JobHistoryEventHandler exte
   }
 
   protected void handleEvent(JobHistoryEvent event) {
-    // check for first event from a job
-    //TODO Log a meta line with version information.
     synchronized (lock) {
     if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
       try {
-        setupEventWriter(event.getJobID());
+          JobSubmittedEvent jobSubmittedEvent = (JobSubmittedEvent) event
+              .getHistoryEvent();
+          setupEventWriter(event.getJobID(), jobSubmittedEvent);
       } catch (IOException ioe) {
-        LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe);
+          LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
+              ioe);
         throw new YarnException(ioe);
       }
     }
@@ -346,7 +357,9 @@ public class JobHistoryEventHandler exte
     try {
       HistoryEvent historyEvent = event.getHistoryEvent();
       mi.writeEvent(historyEvent);
-      LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType());
+        processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary());
+        LOG.info("In HistoryEventHandler "
+            + event.getHistoryEvent().getEventType());
     } catch (IOException e) {
       LOG.error("Error writing History Event: " + event.getHistoryEvent(), e);
       throw new YarnException(e);
@@ -354,10 +367,12 @@ public class JobHistoryEventHandler exte
     // check for done
     if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
       try {
-        JobFinishedEvent jFinishedEvent = (JobFinishedEvent)event.getHistoryEvent();
+          JobFinishedEvent jFinishedEvent = (JobFinishedEvent) event
+              .getHistoryEvent();
         mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
         mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
-        mi.getJobIndexInfo().setNumReduces(jFinishedEvent.getFinishedReduces());
+          mi.getJobIndexInfo().setNumReduces(
+              jFinishedEvent.getFinishedReduces());
         closeEventWriter(event.getJobID());
       } catch (IOException e) {
         throw new YarnException(e);
@@ -366,7 +381,52 @@ public class JobHistoryEventHandler exte
   }
   }
 
-  //TODO Path is intermediate_done/user -> Work with this throughout.
+  private void processEventForJobSummary(HistoryEvent event, JobSummary summary) {
+    // context.getJob could be used for some of this info as well.
+    switch (event.getEventType()) {
+    case JOB_SUBMITTED:
+      JobSubmittedEvent jse = (JobSubmittedEvent) event;
+      summary.setUser(jse.getUserName());
+      summary.setQueue(jse.getJobQueueName());
+      break;
+    case JOB_INITED:
+      JobInitedEvent jie = (JobInitedEvent) event;
+      summary.setJobLaunchTime(jie.getLaunchTime());
+      break;
+    case MAP_ATTEMPT_STARTED:
+      TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent) event;
+      if (summary.getFirstMapTaskLaunchTime() == 0)
+        summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
+      break;
+    case REDUCE_ATTEMPT_STARTED:
+      TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent) event;
+      if (summary.getFirstReduceTaskLaunchTime() == 0)
+        summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
+      break;
+    case JOB_FINISHED:
+      JobFinishedEvent jfe = (JobFinishedEvent) event;
+      summary.setJobFinishTime(jfe.getFinishTime());
+      summary.setNumFinishedMaps(jfe.getFinishedMaps());
+      summary.setNumFailedMaps(jfe.getFailedMaps());
+      summary.setNumFinishedReduces(jfe.getFinishedReduces());
+      summary.setNumFailedReduces(jfe.getFailedReduces());
+      if (summary.getJobStatus() == null)
+        summary
+            .setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
+                .toString());
+      // TODO JOB_FINISHED does not have state. Effectively job history does not
+      // have state about the finished job.
+      break;
+    case JOB_FAILED:
+    case JOB_KILLED:
+      JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
+      summary.setJobStatus(juce.getStatus());
+      break;
+    // TODO Verify: MRV2 + MRV1. A JOB_FINISHED event will always come in after
+    // this. Stats on taskCounts can be set via that.
+    }
+  }
+
   protected void closeEventWriter(JobId jobId) throws IOException {
     final MetaInfo mi = fileMap.get(jobId);
     
@@ -387,15 +447,35 @@ public class JobHistoryEventHandler exte
       LOG.warn("No file for jobconf with " + jobId + " found in cache!");
       }
       
+    // Writing out the summary file.
+    // TODO JH enhancement - reuse this file to store additional indexing info
+    // like ACLs, etc. JHServer can use HDFS append to build an index file
+    // with more info than is available via the filename.
+    Path qualifiedSummaryDoneFile = null;
+    FSDataOutputStream summaryFileOut = null;
+    try {
+      String doneSummaryFileName = getTempFileName(JobHistoryUtils
+          .getIntermediateSummaryFileName(jobId));
+      qualifiedSummaryDoneFile = doneDirFS.makeQualified(new Path(
+          doneDirPrefixPath, doneSummaryFileName));
+      summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true);
+      summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
+      summaryFileOut.close();
+    } catch (IOException e) {
+      LOG.info("Unable to write out JobSummaryInfo to ["
+          + qualifiedSummaryDoneFile + "]", e);
+      throw e;
+    }
+
     Path qualifiedDoneFile = null;
         try {
       if (mi.getHistoryFile() != null) {
       Path logFile = mi.getHistoryFile();
       Path qualifiedLogFile = logDirFS.makeQualified(logFile);
-        String doneJobHistoryFileName = getTempFileName(FileNameIndexUtils.getDoneFileName(mi
-            .getJobIndexInfo()));
-        qualifiedDoneFile = doneDirFS.makeQualified(new Path(
-            doneDirPrefixPath, doneJobHistoryFileName));
+        String doneJobHistoryFileName = getTempFileName(FileNameIndexUtils
+            .getDoneFileName(mi.getJobIndexInfo()));
+        qualifiedDoneFile = doneDirFS.makeQualified(new Path(doneDirPrefixPath,
+            doneJobHistoryFileName));
       moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
       }
       
@@ -410,7 +490,7 @@ public class JobHistoryEventHandler exte
       moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
       }
       
-      
+      moveTmpToDone(qualifiedSummaryDoneFile);
       moveTmpToDone(qualifiedConfDoneFile);
       moveTmpToDone(qualifiedDoneFile);
     } catch (IOException e) {
@@ -424,6 +504,7 @@ public class JobHistoryEventHandler exte
     private Path confFile;
     private EventWriter writer;
     JobIndexInfo jobIndexInfo;
+    JobSummary jobSummary;
 
     MetaInfo(Path historyFile, Path conf, EventWriter writer, long submitTime,
              String user, String jobName, JobId jobId) {
@@ -431,6 +512,7 @@ public class JobHistoryEventHandler exte
       this.confFile = conf;
       this.writer = writer;
       this.jobIndexInfo = new JobIndexInfo(submitTime, -1, user, jobName, jobId, -1, -1);
+      this.jobSummary = new JobSummary();
     }
 
     Path getHistoryFile() { return historyFile; }
@@ -439,6 +521,8 @@ public class JobHistoryEventHandler exte
 
     JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
 
+    JobSummary getJobSummary() { return jobSummary; }
+
     void closeWriter() throws IOException {
       synchronized (lock) {
       if (writer != null) {
@@ -504,21 +588,4 @@ public class JobHistoryEventHandler exte
     //TODO. Some error checking here.
     return tmpFileName.substring(0, tmpFileName.length()-4);
   }
-
-  private void writeStatus(String statusstoredir, HistoryEvent event) throws IOException
{
-    try {
-      Path statusstorepath = doneDirFS.makeQualified(new Path(statusstoredir));
-      doneDirFS.mkdirs(statusstorepath,
-         new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
-      Path toPath = new Path(statusstoredir, "jobstats");
-      FSDataOutputStream out = doneDirFS.create(toPath, true);
-      EventWriter writer = new EventWriter(out);
-      writer.write(event);
-      writer.close();
-      out.close();
-    } catch (IOException ioe) {
-        LOG.error("Status file write failed" +ioe);
-        throw ioe;
-    }
-  }
 }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java?rev=1130185&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
(added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
Wed Jun  1 15:24:12 2011
@@ -0,0 +1,231 @@
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.util.StringUtils;
+
+public class JobSummary {
+  private JobId jobId;
+  private long jobSubmitTime;
+  private long jobLaunchTime;
+  private long firstMapTaskLaunchTime; // MapAttempteStarted |
+                                       // TaskAttemptStartEvent
+  private long firstReduceTaskLaunchTime; // ReduceAttemptStarted |
+                                          // TaskAttemptStartEvent
+  private long jobFinishTime;
+  private int numFinishedMaps;
+  private int numFailedMaps;
+  private int numFinishedReduces;
+  private int numFailedReduces;
+  // private int numSlotsPerMap; | Doesn't make sense with potentially different
+  // resource models
+  // private int numSlotsPerReduce; | Doesn't make sense with potentially
+  // different resource models
+  private String user;
+  private String queue;
+  private String jobStatus;
+  private long mapSlotSeconds; // TODO Not generated yet in MRV2
+  private long reduceSlotSeconds; // TODO Not generated yet MRV2
+  // private int clusterSlotCapacity;
+
+  JobSummary() {
+  }
+
+  public JobId getJobId() {
+    return jobId;
+  }
+
+  public void setJobId(JobId jobId) {
+    this.jobId = jobId;
+  }
+
+  public long getJobSubmitTime() {
+    return jobSubmitTime;
+  }
+
+  public void setJobSubmitTime(long jobSubmitTime) {
+    this.jobSubmitTime = jobSubmitTime;
+  }
+
+  public long getJobLaunchTime() {
+    return jobLaunchTime;
+  }
+
+  public void setJobLaunchTime(long jobLaunchTime) {
+    this.jobLaunchTime = jobLaunchTime;
+  }
+
+  public long getFirstMapTaskLaunchTime() {
+    return firstMapTaskLaunchTime;
+  }
+
+  public void setFirstMapTaskLaunchTime(long firstMapTaskLaunchTime) {
+    this.firstMapTaskLaunchTime = firstMapTaskLaunchTime;
+  }
+
+  public long getFirstReduceTaskLaunchTime() {
+    return firstReduceTaskLaunchTime;
+  }
+
+  public void setFirstReduceTaskLaunchTime(long firstReduceTaskLaunchTime) {
+    this.firstReduceTaskLaunchTime = firstReduceTaskLaunchTime;
+  }
+
+  public long getJobFinishTime() {
+    return jobFinishTime;
+  }
+
+  public void setJobFinishTime(long jobFinishTime) {
+    this.jobFinishTime = jobFinishTime;
+  }
+
+  public int getNumFinishedMaps() {
+    return numFinishedMaps;
+  }
+
+  public void setNumFinishedMaps(int numFinishedMaps) {
+    this.numFinishedMaps = numFinishedMaps;
+  }
+
+  public int getNumFailedMaps() {
+    return numFailedMaps;
+  }
+
+  public void setNumFailedMaps(int numFailedMaps) {
+    this.numFailedMaps = numFailedMaps;
+  }
+
+  // public int getNumSlotsPerMap() {
+  // return numSlotsPerMap;
+  // }
+  //
+  // public void setNumSlotsPerMap(int numSlotsPerMap) {
+  // this.numSlotsPerMap = numSlotsPerMap;
+  // }
+
+  public int getNumFinishedReduces() {
+    return numFinishedReduces;
+  }
+
+  public void setNumFinishedReduces(int numFinishedReduces) {
+    this.numFinishedReduces = numFinishedReduces;
+  }
+
+  public int getNumFailedReduces() {
+    return numFailedReduces;
+  }
+
+  public void setNumFailedReduces(int numFailedReduces) {
+    this.numFailedReduces = numFailedReduces;
+  }
+
+  // public int getNumSlotsPerReduce() {
+  // return numSlotsPerReduce;
+  // }
+  //
+  // public void setNumSlotsPerReduce(int numSlotsPerReduce) {
+  // this.numSlotsPerReduce = numSlotsPerReduce;
+  // }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public void setQueue(String queue) {
+    this.queue = queue;
+  }
+
+  public String getJobStatus() {
+    return jobStatus;
+  }
+
+  public void setJobStatus(String jobStatus) {
+    this.jobStatus = jobStatus;
+  }
+
+  public long getMapSlotSeconds() {
+    return mapSlotSeconds;
+  }
+
+  public void setMapSlotSeconds(long mapSlotSeconds) {
+    this.mapSlotSeconds = mapSlotSeconds;
+  }
+
+  public long getReduceSlotSeconds() {
+    return reduceSlotSeconds;
+  }
+
+  public void setReduceSlotSeconds(long reduceSlotSeconds) {
+    this.reduceSlotSeconds = reduceSlotSeconds;
+  }
+
+  // public int getClusterSlotCapacity() {
+  // return clusterSlotCapacity;
+  // }
+  //
+  // public void setClusterSlotCapacity(int clusterSlotCapacity) {
+  // this.clusterSlotCapacity = clusterSlotCapacity;
+  // }
+
+  public String getJobSummaryString() {
+    SummaryBuilder summary = new SummaryBuilder()
+      .add("jobId", jobId)
+      .add("submitTime", jobSubmitTime)
+      .add("launchTime", jobLaunchTime)
+      .add("firstMapTaskLaunchTime", firstMapTaskLaunchTime)
+      .add("firstReduceTaskLaunchTime", firstReduceTaskLaunchTime)
+      .add("finishTime", jobFinishTime)
+      .add("numMaps", numFinishedMaps + numFailedMaps)
+      .add("numReduces", numFinishedReduces + numFailedReduces)
+      .add("user", user)
+      .add("queue", queue)
+      .add("status", jobStatus)
+      .add("mapSlotSeconds", mapSlotSeconds)
+      .add("reduceSlotSeconds", reduceSlotSeconds);
+    return summary.toString();
+  }
+
+  static final char EQUALS = '=';
+  static final char[] charsToEscape = { StringUtils.COMMA, EQUALS,
+      StringUtils.ESCAPE_CHAR };
+  
+  static class SummaryBuilder {
+    final StringBuilder buffer = new StringBuilder();
+
+    // A little optimization for a very common case
+    SummaryBuilder add(String key, long value) {
+      return _add(key, Long.toString(value));
+    }
+
+    <T> SummaryBuilder add(String key, T value) {
+      return _add(key, StringUtils.escapeString(String.valueOf(value),
+          StringUtils.ESCAPE_CHAR, charsToEscape));
+    }
+
+    SummaryBuilder add(SummaryBuilder summary) {
+      if (buffer.length() > 0)
+        buffer.append(StringUtils.COMMA);
+      buffer.append(summary.buffer);
+      return this;
+    }
+
+    SummaryBuilder _add(String key, String value) {
+      if (buffer.length() > 0)
+        buffer.append(StringUtils.COMMA);
+      buffer.append(key).append(EQUALS).append(value);
+      return this;
+    }
+
+    @Override
+    public String toString() {
+      return buffer.toString();
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Wed Jun  1 15:24:12 2011
@@ -1212,7 +1212,7 @@ public class JobImpl implements org.apac
         new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
           job.finishTime,
           job.succeededMapTaskCount, job.numReduceTasks, job.failedMapTaskCount,
-          job.numReduceTasks, //TODO replace finishedReduceTasks
+          job.failedReduceTaskCount,
           TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounter
           TypeConverter.fromYarn(job.getCounters()), // TODO reduceCounters
           TypeConverter.fromYarn(job.getCounters()));

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
Wed Jun  1 15:24:12 2011
@@ -566,6 +566,7 @@ public abstract class TaskImpl implement
           .fromYarn(task.taskId), task.getLaunchTime(), TypeConverter
           .fromYarn(task.taskId.getTaskType()), TaskState.RUNNING.toString());
       task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
+      //TODO This is a transition from NEW to SCHEDULED, not RUNNING
 
       task.addAndScheduleAttempt();
       task.metrics.launchedTask(task);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobIdPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobIdPBImpl.java?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobIdPBImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobIdPBImpl.java
Wed Jun  1 15:24:12 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
 
+import java.text.NumberFormat;
+
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProtoOrBuilder;
@@ -27,6 +29,16 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
     
 public class JobIdPBImpl extends ProtoBase<JobIdProto> implements JobId {
+
+  protected static final String JOB = "job";
+  protected static final char SEPARATOR = '_';
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(4);
+  }
+  
+  
   JobIdProto proto = JobIdProto.getDefaultInstance();
   JobIdProto.Builder builder = null;
   boolean viaProto = false;
@@ -116,4 +128,14 @@ public class JobIdPBImpl extends ProtoBa
   private synchronized ApplicationIdProto convertToProtoFormat(ApplicationId t) {
     return ((ApplicationIdPBImpl)t).getProto();
   }
+  
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder(JOB);
+    builder.append(SEPARATOR);
+    builder.append(getAppId().getClusterTimestamp());
+    builder.append(SEPARATOR);
+    builder.append(idFormat.format(getId()));
+    return builder.toString();
+  }
 }  

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
Wed Jun  1 15:24:12 2011
@@ -89,6 +89,11 @@ public class JobHistoryUtils {
   public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
   
   /**
+   * Suffix for summary files.
+   */
+  public static final String SUMMARY_FILE_NAME_SUFFIX = ".summary";
+  
+  /**
    * Job History File extension.
    */
   public static final String JOB_HISTORY_FILE_EXTENSION = ".jhist";
@@ -231,6 +236,15 @@ public class JobHistoryUtils {
   }
   
   /**
+   * Get the done summary file name for a job.
+   * @param jobId the jobId.
+   * @return the conf file name.
+   */
+  public static String getIntermediateSummaryFileName(JobId jobId) {
+    return TypeConverter.fromYarn(jobId).toString() + SUMMARY_FILE_NAME_SUFFIX;
+  }
+  
+  /**
    * Gets the conf file path for jobs in progress.
    * 
    * @param logDir the log directory prefix.

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
Wed Jun  1 15:24:12 2011
@@ -42,6 +42,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
@@ -52,6 +53,7 @@ import org.apache.hadoop.fs.RemoteIterat
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
@@ -82,6 +84,8 @@ public class JobHistory extends Abstract
   
   private static final Log LOG = LogFactory.getLog(JobHistory.class);
 
+  private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
+
   private static final Pattern DATE_PATTERN = Pattern
       .compile("([0-1]?[0-9])/([0-3]?[0-9])/((?:2[0-9])[0-9][0-9])");
 
@@ -382,8 +386,11 @@ public class JobHistory extends Abstract
           .getName());
       String confFileName = JobHistoryUtils
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
       MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), jobIndexInfo);
+          .getParent(), confFileName), new Path(fs.getPath().getParent(),
+          summaryFileName), jobIndexInfo);
       addToJobListCache(jobIndexInfo.getJobId(), metaInfo);
     }
   }
@@ -478,8 +485,11 @@ public class JobHistory extends Abstract
           .getName());
       String confFileName = JobHistoryUtils
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
+      String summaryFileName = JobHistoryUtils
+          .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
       MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-          .getParent(), confFileName), jobIndexInfo);
+          .getParent(), confFileName), new Path(fs.getPath().getParent(),
+          summaryFileName), jobIndexInfo);
       if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
         intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
       }
@@ -501,8 +511,11 @@ public class JobHistory extends Abstract
       if (jobIndexInfo.getJobId().equals(jobId)) {
         String confFileName = JobHistoryUtils
             .getIntermediateConfFileName(jobIndexInfo.getJobId());
+        String summaryFileName = JobHistoryUtils
+            .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
         MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
-            .getParent(), confFileName), jobIndexInfo);
+            .getParent(), confFileName), new Path(fs.getPath().getParent(),
+            summaryFileName), jobIndexInfo);
         return metaInfo;
       }
     }
@@ -576,7 +589,11 @@ public class JobHistory extends Abstract
             moveToDoneExecutor.execute(new Runnable() {
               @Override
               public void run() {
+                try {
                 moveToDone(metaInfo);
+                } catch (IOException e) {
+                  LOG.info("Failed to process metaInfo for job: " + metaInfo.jobIndexInfo.getJobId(),
e);
+                }
               }
             });
 
@@ -819,7 +836,7 @@ public class JobHistory extends Abstract
   
 
   
-  private void moveToDone(MetaInfo metaInfo) {
+  private void moveToDone(MetaInfo metaInfo) throws IOException {
     long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
     if (completeTime == 0) completeTime = System.currentTimeMillis();
     JobId jobId = metaInfo.getJobIndexInfo().getJobId();
@@ -839,13 +856,30 @@ public class JobHistory extends Abstract
       paths.add(confFile);
     }
     
+    //TODO Check all mi getters and setters for the conf path
+    Path summaryFile = metaInfo.getSummaryFile();
+    if (summaryFile == null) {
+      LOG.info("No summary file for job: " + jobId);
+    } else {
+      try {
+        String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile);
+        SUMMARY_LOG.info(jobSummaryString);
+        LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
+        intermediateDoneDirFc.delete(summaryFile, false);
+        metaInfo.setSummaryFile(null);
+      } catch (IOException e) {
+        LOG.warn("Failed to process summary file: [" + summaryFile + "]");
+        throw e;
+      }
+    }
+
     Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
     addDirectoryToSerialNumberIndex(targetDir);
     try {
       maybeMakeSubdirectory(targetDir);
     } catch (IOException e) {
-      LOG.info("Failed creating subdirectory: " + targetDir + " while attempting to move
files for jobId: " + jobId);
-      return;
+      LOG.warn("Failed creating subdirectory: " + targetDir + " while attempting to move
files for jobId: " + jobId);
+      throw e;
     }
     synchronized (metaInfo) {
       if (historyFile != null) {
@@ -853,8 +887,8 @@ public class JobHistory extends Abstract
         try {
           moveToDoneNow(historyFile, toPath);
         } catch (IOException e) {
-          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId, e);
-          return;
+          LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
+          throw e;
         }
         metaInfo.setHistoryFile(toPath);
       }
@@ -863,8 +897,8 @@ public class JobHistory extends Abstract
         try {
           moveToDoneNow(confFile, toPath);
         } catch (IOException e) {
-          LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId, e);
-          return;
+          LOG.warn("Failed to move file: " + historyFile + " for jobId: " + jobId);
+          throw e;
         }
         metaInfo.setConfFile(toPath);
       }
@@ -883,6 +917,14 @@ public class JobHistory extends Abstract
     //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
   }
   
+  String getJobSummary(FileContext fc, Path path) throws IOException {
+    Path qPath = fc.makeQualified(path);
+    FSDataInputStream in = fc.open(qPath);
+    String jobSummaryString = in.readUTF();
+    in.close();
+    return jobSummaryString;
+  }
+  
   private void maybeMakeSubdirectory(Path path) throws IOException {
     boolean existsInExistingCache = false;
     synchronized(existingDoneSubdirs) {
@@ -973,20 +1015,24 @@ public class JobHistory extends Abstract
   static class MetaInfo {
     private Path historyFile;
     private Path confFile; 
+    private Path summaryFile;
     JobIndexInfo jobIndexInfo;
 
-    MetaInfo(Path historyFile, Path confFile, JobIndexInfo jobIndexInfo) {
+    MetaInfo(Path historyFile, Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo)
{
       this.historyFile = historyFile;
       this.confFile = confFile;
+      this.summaryFile = summaryFile;
       this.jobIndexInfo = jobIndexInfo;
       }
 
     Path getHistoryFile() { return historyFile; }
     Path getConfFile() { return confFile; }
+    Path getSummaryFile() { return summaryFile; }
     JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
     
     void setHistoryFile(Path historyFile) { this.historyFile = historyFile; }
     void setConfFile(Path confFile) {this.confFile = confFile; }
+    void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; }
   }
   
 
@@ -1018,7 +1064,7 @@ public class JobHistory extends Abstract
             long effectiveTimestamp = getEffectiveTimestamp(jobIndexInfo.getFinishTime(),
historyFile);
             if (shouldDelete(effectiveTimestamp)) {
               String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
-              MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(historyFile.getPath().getParent(),
confFileName), jobIndexInfo);
+              MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path(historyFile.getPath().getParent(),
confFileName), null, jobIndexInfo);
               delete(metaInfo);
             } else {
               halted = true;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1130185&r1=1130184&r2=1130185&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
Wed Jun  1 15:24:12 2011
@@ -19,6 +19,9 @@
 package org.apache.hadoop.mapreduce.v2.hs;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
 
 import junit.framework.Assert;
 
@@ -45,9 +48,6 @@ import org.junit.Test;
 
 public class TestJobHistoryParsing {
   private static final Log LOG = LogFactory.getLog(TestJobHistoryParsing.class);
-  //TODO FIX once final CompletedStatusStore is available
-//  private static final String STATUS_STORE_DIR_KEY =
-//    "yarn.server.nodemanager.jobstatus";
   @Test
   public void testHistoryParsing() throws Exception {
     Configuration conf = new Configuration();
@@ -71,8 +71,9 @@ public class TestJobHistoryParsing {
     Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
     FSDataInputStream in = null;
     LOG.info("JobHistoryFile is: " + historyFilePath);
+    FileContext fc = null;
     try {
-      FileContext fc = FileContext.getFileContext(conf);
+      fc = FileContext.getFileContext(conf);
       in = fc.open(fc.makeQualified(historyFilePath));
     } catch (IOException ioe) {
       LOG.info("Can not open history file: " + historyFilePath, ioe);
@@ -103,22 +104,45 @@ public class TestJobHistoryParsing {
       Assert.assertTrue("total number of task attempts ", 
           taskAttemptCount == 1);
     }
-//
-//   // Test for checking jobstats for job status store
-//    Path statusFilePath = new Path(jobstatusDir, "jobstats");
-//    try {
-//      FileContext fc = FileContext.getFileContext(statusFilePath.toUri());
-//      in = fc.open(statusFilePath);
-//    } catch (IOException ioe) {
-//      LOG.info("Can not open status file "+ ioe);
-//      throw (new Exception("Can not open status File"));
-//    }
-//    parser = new JobHistoryParser(in);
-//    jobInfo = parser.parse();
-//    Assert.assertTrue("incorrect finishedMap in job stats file ",
-//        jobInfo.getFinishedMaps() == 2);
-//    Assert.assertTrue("incorrect finishedReduces in job stats file ",
-//        jobInfo.getFinishedReduces() == 1);
+    
+    String summaryFileName = JobHistoryUtils
+        .getIntermediateSummaryFileName(jobId);
+    Path summaryFile = new Path(jobhistoryDir, summaryFileName);
+    String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+    Assert.assertNotNull(jobSummaryString);
+
+    Map<String, String> jobSummaryElements = new HashMap<String, String>();
+    StringTokenizer strToken = new StringTokenizer(jobSummaryString, ",");
+    while (strToken.hasMoreTokens()) {
+      String keypair = strToken.nextToken();
+      jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]);
+
+    }
+
+    Assert.assertEquals("JobId does not match", jobId.toString(),
+        jobSummaryElements.get("jobId"));
+    Assert.assertTrue("submitTime should not be 0",
+        Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
+    Assert.assertTrue("launchTime should not be 0",
+        Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
+    Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
+        Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+    Assert
+        .assertTrue(
+            "firstReduceTaskLaunchTime should not be 0",
+            Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+    Assert.assertTrue("finishTime should not be 0",
+        Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
+    Assert.assertEquals("Mismatch in num map slots", 2,
+        Integer.parseInt(jobSummaryElements.get("numMaps")));
+    Assert.assertEquals("Mismatch in num reduce slots", 1,
+        Integer.parseInt(jobSummaryElements.get("numReduces")));
+    Assert.assertEquals("User does not match", "mapred",
+        jobSummaryElements.get("user"));
+    Assert.assertEquals("Queue does not match", "default",
+        jobSummaryElements.get("queue"));
+    Assert.assertEquals("Status does not match", "SUCCEEDED",
+        jobSummaryElements.get("status"));
   }
 
   public static void main(String[] args) throws Exception {



Mime
View raw message