hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1234086 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/small-trace-test/ src/tools/org/apache/hadoop/tools/rumen/
Date Fri, 20 Jan 2012 19:30:11 GMT
Author: tucu
Date: Fri Jan 20 19:30:11 2012
New Revision: 1234086

URL: http://svn.apache.org/viewvc?rev=1234086&view=rev
Log:
Merge -r 1222694:1222695 from trunk to branch. FIXES: MAPREDUCE-3597

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jan 20 19:30:11
2012
@@ -18,7 +18,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-778. Rumen Anonymizer. (Amar Kamat and Chris Douglas via amarrk)
 
   IMPROVEMENTS
-    MAPREDUCE-3375. [Gridmix] Memory Emulation system tests.
+    MAPREDUCE-3597. [Rumen] Rumen should provide APIs to access all the 
+                    job-history related information.
+
+    MAPREDUCE-3375. [Gridmix] Memory Emulation system tests. 
                     (Vinay Thota via amarrk)
 
     MAPREDUCE-2733. [Gridmix] Gridmix3 cpu emulation system tests. 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
Fri Jan 20 19:30:11 2012
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
@@ -61,6 +63,8 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class TestRumenJobTraces {
+  private static final Log LOG = LogFactory.getLog(TestRumenJobTraces.class);
+
   @Test
   public void testSmallTrace() throws Exception {
     performSingleTest("sample-job-tracker-logs.gz",
@@ -236,11 +240,21 @@ public class TestRumenJobTraces {
       parser = new Hadoop20JHParser(ris);
       ArrayList<String> seenEvents = new ArrayList<String>(150);
 
-      getHistoryEvents(parser, seenEvents, null); // get events into seenEvents
+      // this is same as the one in input history file
+      String jobId = "job_200904211745_0002";
+      JobBuilder builder = new JobBuilder(jobId);
+
+      // get events into seenEvents
+      getHistoryEvents(parser, seenEvents, builder);
 
       // Validate the events seen by history parser from
       // history file v20-single-input-log.gz
       validateSeenHistoryEvents(seenEvents, goldLines);
+
+      ParsedJob parsedJob = builder.build();
+      // validate the obtainXXX api of ParsedJob, ParsedTask and
+      // ParsedTaskAttempt
+      validateParsedJob(parsedJob, 20, 1, true);
     } finally {
       if (parser != null) {
         parser.close();
@@ -584,9 +598,11 @@ public class TestRumenJobTraces {
       // validate resource usage metrics
       //  get the job counters
       Counters counters = job.getTaskReports(TaskType.MAP)[0].getTaskCounters();
-      
+
+      // get the parsed job
+      ParsedJob parsedJob = builder.build();
       //  get the logged job
-      LoggedJob loggedJob = builder.build();
+      LoggedJob loggedJob = parsedJob;
       //  get the logged attempts
       LoggedTaskAttempt attempt = 
         loggedJob.getMapTasks().get(0).getAttempts().get(0);
@@ -600,6 +616,10 @@ public class TestRumenJobTraces {
           counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).getValue(),
           counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES).getValue(),
           true);
+
+      // validate the obtainXXX api of ParsedJob, ParsedTask and
+      // ParsedTaskAttempt
+      validateParsedJob(parsedJob, 1, 1, false);
     } finally {
       // stop the MR cluster
       mrCluster.shutdown();
@@ -616,6 +636,142 @@ public class TestRumenJobTraces {
     }
   }
 
+  /**
+   * Verify if the obtainXXX methods of {@link ParsedJob}, {@link ParsedTask}
+   * and {@link ParsedTaskAttempt} give valid info
+   */
+  private void validateParsedJob(ParsedJob parsedJob, int numMaps,
+      int numReduces, boolean pre21JobHistory) {
+    validateParsedJobAPI(parsedJob, numMaps, numReduces, pre21JobHistory);
+
+    List<ParsedTask> maps = parsedJob.obtainMapTasks();
+    for (ParsedTask task : maps) {
+      validateParsedTask(task);
+    }
+    List<ParsedTask> reduces = parsedJob.obtainReduceTasks();
+    for (ParsedTask task : reduces) {
+      validateParsedTask(task);
+    }
+    List<ParsedTask> others = parsedJob.obtainOtherTasks();
+    for (ParsedTask task : others) {
+      validateParsedTask(task);
+    }
+  }
+
+  /** Verify if the obtainXXX methods of {@link ParsedJob} give valid info */
+  private void validateParsedJobAPI(ParsedJob parsedJob, int numMaps,
+      int numReduces, boolean pre21JobHistory) {
+    LOG.info("Validating ParsedJob.obtainXXX api... for "
+             + parsedJob.getJobID());
+    assertNotNull("Job acls in ParsedJob is null",
+                  parsedJob.obtainJobAcls());
+    assertNotNull("Job conf path in ParsedJob is null",
+                  parsedJob.obtainJobConfpath());
+
+    assertNotNull("Map Counters in ParsedJob is null",
+                  parsedJob.obtainMapCounters());
+    assertNotNull("Reduce Counters in ParsedJob is null",
+                  parsedJob.obtainReduceCounters());
+    assertNotNull("Total Counters in ParsedJob is null",
+                  parsedJob.obtainTotalCounters());
+
+    assertNotNull("Map Tasks List in ParsedJob is null",
+                  parsedJob.obtainMapTasks());
+    assertNotNull("Reduce Tasks List in ParsedJob is null",
+                  parsedJob.obtainReduceTasks());
+    assertNotNull("Other Tasks List in ParsedJob is null",
+                  parsedJob.obtainOtherTasks());
+
+    // 1 map and 1 reduce task should be there
+    assertEquals("Number of map tasks in ParsedJob is wrong",
+                 numMaps, parsedJob.obtainMapTasks().size());
+    assertEquals("Number of reduce tasks in ParsedJob is wrong",
+                 numReduces, parsedJob.obtainReduceTasks().size(), 1);
+
+    // old hadoop20 version history files don't have job-level-map-counters and
+    // job-level-reduce-counters. Only total counters exist there.
+    assertTrue("Total Counters in ParsedJob is empty",
+               parsedJob.obtainTotalCounters().size() > 0);
+    if (!pre21JobHistory) {
+      assertTrue("Map Counters in ParsedJob is empty",
+                 parsedJob.obtainMapCounters().size() > 0);
+      assertTrue("Reduce Counters in ParsedJob is empty",
+                 parsedJob.obtainReduceCounters().size() > 0);
+    }
+  }
+
+  /**
+   * Verify if the obtainXXX methods of {@link ParsedTask} and
+   * {@link ParsedTaskAttempt} give valid info
+   */
+  private void validateParsedTask(ParsedTask parsedTask) {
+    validateParsedTaskAPI(parsedTask);
+
+    List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
+    for (ParsedTaskAttempt attempt : attempts) {
+      validateParsedTaskAttemptAPI(attempt);
+    }
+  }
+
+  /** Verify if the obtainXXX methods of {@link ParsedTask} give valid info */
+  private void validateParsedTaskAPI(ParsedTask parsedTask) {
+    LOG.info("Validating ParsedTask.obtainXXX api... for "
+             + parsedTask.getTaskID());
+    assertNotNull("Task counters in ParsedTask is null",
+                  parsedTask.obtainCounters());
+
+    if (parsedTask.getTaskStatus()
+        == Pre21JobHistoryConstants.Values.SUCCESS) {
+      // task counters should not be empty
+      assertTrue("Task counters in ParsedTask is empty",
+                 parsedTask.obtainCounters().size() > 0);
+      assertNull("Diagnostic-info is non-null for a succeeded task",
+                 parsedTask.obtainDiagnosticInfo());
+      assertNull("Failed-due-to-attemptId is non-null for a succeeded task",
+                 parsedTask.obtainFailedDueToAttemptId());
+    } else {
+      assertNotNull("Diagnostic-info is non-null for a succeeded task",
+                    parsedTask.obtainDiagnosticInfo());
+      assertNotNull("Failed-due-to-attemptId is non-null for a succeeded task",
+                    parsedTask.obtainFailedDueToAttemptId());
+    }
+
+    List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
+    assertNotNull("TaskAttempts list in ParsedTask is null", attempts);
+    assertTrue("TaskAttempts list in ParsedTask is empty",
+               attempts.size() > 0);    
+  }
+
+  /**
+   * Verify if the obtainXXX methods of {@link ParsedTaskAttempt} give
+   * valid info
+   */
+  private void validateParsedTaskAttemptAPI(
+      ParsedTaskAttempt parsedTaskAttempt) {
+    LOG.info("Validating ParsedTaskAttempt.obtainXXX api... for "
+             + parsedTaskAttempt.getAttemptID());
+    assertNotNull("Counters in ParsedTaskAttempt is null",
+                  parsedTaskAttempt.obtainCounters());
+
+    if (parsedTaskAttempt.getResult()
+        == Pre21JobHistoryConstants.Values.SUCCESS) { 
+      assertTrue("Counters in ParsedTaskAttempt is empty",
+               parsedTaskAttempt.obtainCounters().size() > 0);
+      assertNull("Diagnostic-info is non-null for a succeeded taskAttempt",
+                 parsedTaskAttempt.obtainDiagnosticInfo());
+    } else {
+      assertNotNull("Diagnostic-info is non-null for a succeeded taskAttempt",
+                 parsedTaskAttempt.obtainDiagnosticInfo());
+    }
+    assertNotNull("TrackerName in ParsedTaskAttempt is null",
+                  parsedTaskAttempt.obtainTrackerName());
+
+    assertNotNull("http-port info in ParsedTaskAttempt is null",
+        parsedTaskAttempt.obtainHttpPort());
+    assertNotNull("Shuffle-port info in ParsedTaskAttempt is null",
+        parsedTaskAttempt.obtainShufflePort());
+  }
+
   @Test
   public void testJobConfigurationParser() throws Exception {
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
Fri Jan 20 19:30:11 2012
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinished;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
@@ -45,14 +46,15 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailed;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinished;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
-import org.apache.hadoop.tools.rumen.datatypes.JobProperties;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -67,16 +69,16 @@ public class JobBuilder {
 
   private boolean finalized = false;
 
-  private LoggedJob result = new LoggedJob();
+  private ParsedJob result = new ParsedJob();
 
-  private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
-  private Map<String, LoggedTask> reduceTasks =
-      new HashMap<String, LoggedTask>();
-  private Map<String, LoggedTask> otherTasks =
-      new HashMap<String, LoggedTask>();
+  private Map<String, ParsedTask> mapTasks = new HashMap<String, ParsedTask>();
+  private Map<String, ParsedTask> reduceTasks =
+      new HashMap<String, ParsedTask>();
+  private Map<String, ParsedTask> otherTasks =
+      new HashMap<String, ParsedTask>();
 
-  private Map<String, LoggedTaskAttempt> attempts =
-      new HashMap<String, LoggedTaskAttempt>();
+  private Map<String, ParsedTaskAttempt> attempts =
+      new HashMap<String, ParsedTaskAttempt>();
 
   private Map<ParsedHost, ParsedHost> allHosts =
       new HashMap<ParsedHost, ParsedHost>();
@@ -123,7 +125,7 @@ public class JobBuilder {
   public void process(HistoryEvent event) {
     if (finalized) {
       throw new IllegalStateException(
-          "JobBuilder.process(HistoryEvent event) called after LoggedJob built");
+          "JobBuilder.process(HistoryEvent event) called after ParsedJob built");
     }
 
     // these are in lexicographical order by class name.
@@ -229,12 +231,16 @@ public class JobBuilder {
   public void process(Properties conf) {
     if (finalized) {
       throw new IllegalStateException(
-          "JobBuilder.process(Properties conf) called after LoggedJob built");
+          "JobBuilder.process(Properties conf) called after ParsedJob built");
     }
 
     //TODO remove this once the deprecate APIs in LoggedJob are removed
-    result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
-        .getCandidates(), "default"));
+    String queue = extract(conf, JobConfPropertyNames.QUEUE_NAMES
+                           .getCandidates(), null);
+    // set the queue name if existing
+    if (queue != null) {
+      result.setQueue(queue);
+    }
     result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
         .getCandidates(), null));
 
@@ -252,9 +258,9 @@ public class JobBuilder {
    * Request the builder to build the final object. Once called, the
    * {@link JobBuilder} would accept no more events or job-conf properties.
    * 
-   * @return Parsed {@link LoggedJob} object.
+   * @return Parsed {@link ParsedJob} object.
    */
-  public LoggedJob build() {
+  public ParsedJob build() {
     // The main job here is to build CDFs and manage the conf
     finalized = true;
 
@@ -416,7 +422,7 @@ public class JobBuilder {
   }
 
   private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
-    LoggedTask task = getTask(event.getTaskId().toString());
+    ParsedTask task = getTask(event.getTaskId().toString());
     if (task == null) {
       return;
     }
@@ -424,7 +430,7 @@ public class JobBuilder {
   }
 
   private void processTaskStartedEvent(TaskStartedEvent event) {
-    LoggedTask task =
+    ParsedTask task =
         getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
     task.setStartTime(event.getStartTime());
     task.setPreferredLocations(preferredLocationForSplits(event
@@ -432,7 +438,7 @@ public class JobBuilder {
   }
 
   private void processTaskFinishedEvent(TaskFinishedEvent event) {
-    LoggedTask task =
+    ParsedTask task =
         getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
     if (task == null) {
       return;
@@ -443,18 +449,22 @@ public class JobBuilder {
   }
 
   private void processTaskFailedEvent(TaskFailedEvent event) {
-    LoggedTask task =
+    ParsedTask task =
         getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
     if (task == null) {
       return;
     }
     task.setFinishTime(event.getFinishTime());
     task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+    TaskFailed t = (TaskFailed)(event.getDatum());
+    task.putDiagnosticInfo(t.error.toString());
+    task.putFailedDueToAttemptId(t.failedDueToAttempt.toString());
+    // No counters in TaskFailedEvent
   }
 
   private void processTaskAttemptUnsuccessfulCompletionEvent(
       TaskAttemptUnsuccessfulCompletionEvent event) {
-    LoggedTaskAttempt attempt =
+    ParsedTaskAttempt attempt =
         getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
             event.getTaskAttemptId().toString());
 
@@ -476,20 +486,27 @@ public class JobBuilder {
     attempt.arraySetCpuUsages(event.getCpuUsages());
     attempt.arraySetVMemKbytes(event.getVMemKbytes());
     attempt.arraySetPhysMemKbytes(event.getPhysMemKbytes());
+    TaskAttemptUnsuccessfulCompletion t =
+        (TaskAttemptUnsuccessfulCompletion) (event.getDatum());
+    attempt.putDiagnosticInfo(t.error.toString());
+    // No counters in TaskAttemptUnsuccessfulCompletionEvent
   }
 
   private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
-    LoggedTaskAttempt attempt =
+    ParsedTaskAttempt attempt =
         getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
             event.getTaskAttemptId().toString());
     if (attempt == null) {
       return;
     }
     attempt.setStartTime(event.getStartTime());
+    attempt.putTrackerName(event.getTrackerName());
+    attempt.putHttpPort(event.getHttpPort());
+    attempt.putShufflePort(event.getShufflePort());
   }
 
   private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
-    LoggedTaskAttempt attempt =
+    ParsedTaskAttempt attempt =
         getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
             event.getAttemptId().toString());
     if (attempt == null) {
@@ -507,7 +524,7 @@ public class JobBuilder {
 
   private void processReduceAttemptFinishedEvent(
       ReduceAttemptFinishedEvent event) {
-    LoggedTaskAttempt attempt =
+    ParsedTaskAttempt attempt =
         getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
             event.getAttemptId().toString());
     if (attempt == null) {
@@ -536,7 +553,7 @@ public class JobBuilder {
   }
 
   private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
-    LoggedTaskAttempt attempt =
+    ParsedTaskAttempt attempt =
         getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
             event.getAttemptId().toString());
     if (attempt == null) {
@@ -568,6 +585,7 @@ public class JobBuilder {
     result.setOutcome(Pre21JobHistoryConstants.Values
         .valueOf(event.getStatus()));
     result.setFinishTime(event.getFinishTime());
+    // No counters in JobUnsuccessfulCompletionEvent
   }
 
   private void processJobSubmittedEvent(JobSubmittedEvent event) {
@@ -575,8 +593,14 @@ public class JobBuilder {
     result.setJobName(event.getJobName());
     result.setUser(event.getUserName());
     result.setSubmitTime(event.getSubmitTime());
-    // job queue name is set when conf file is processed.
-    // See JobBuilder.process(Properties) method for details.
+    result.putJobConfPath(event.getJobConfPath());
+    result.putJobAcls(event.getJobAcls());
+
+    // set the queue name if existing
+    String queue = event.getJobQueueName();
+    if (queue != null) {
+      result.setQueue(queue);
+    }
   }
 
   private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
@@ -603,10 +627,19 @@ public class JobBuilder {
     result.setFinishTime(event.getFinishTime());
     result.setJobID(jobID);
     result.setOutcome(Values.SUCCESS);
+
+    JobFinished job = (JobFinished)event.getDatum();
+    Map<String, Long> countersMap =
+        JobHistoryUtils.extractCounters(job.totalCounters);
+    result.putTotalCounters(countersMap);
+    countersMap = JobHistoryUtils.extractCounters(job.mapCounters);
+    result.putMapCounters(countersMap);
+    countersMap = JobHistoryUtils.extractCounters(job.reduceCounters);
+    result.putReduceCounters(countersMap);
   }
 
-  private LoggedTask getTask(String taskIDname) {
-    LoggedTask result = mapTasks.get(taskIDname);
+  private ParsedTask getTask(String taskIDname) {
+    ParsedTask result = mapTasks.get(taskIDname);
 
     if (result != null) {
       return result;
@@ -630,9 +663,9 @@ public class JobBuilder {
    *          if true, we can create a task.
    * @return
    */
-  private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
+  private ParsedTask getOrMakeTask(TaskType type, String taskIDname,
       boolean allowCreate) {
-    Map<String, LoggedTask> taskMap = otherTasks;
+    Map<String, ParsedTask> taskMap = otherTasks;
     List<LoggedTask> tasks = this.result.getOtherTasks();
 
     switch (type) {
@@ -650,10 +683,10 @@ public class JobBuilder {
       // no code
     }
 
-    LoggedTask result = taskMap.get(taskIDname);
+    ParsedTask result = taskMap.get(taskIDname);
 
     if (result == null && allowCreate) {
-      result = new LoggedTask();
+      result = new ParsedTask();
       result.setTaskType(getPre21Value(type.toString()));
       result.setTaskID(taskIDname);
       taskMap.put(taskIDname, result);
@@ -663,13 +696,13 @@ public class JobBuilder {
     return result;
   }
 
-  private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
+  private ParsedTaskAttempt getOrMakeTaskAttempt(TaskType type,
       String taskIDName, String taskAttemptName) {
-    LoggedTask task = getOrMakeTask(type, taskIDName, false);
-    LoggedTaskAttempt result = attempts.get(taskAttemptName);
+    ParsedTask task = getOrMakeTask(type, taskIDName, false);
+    ParsedTaskAttempt result = attempts.get(taskAttemptName);
 
     if (result == null && task != null) {
-      result = new LoggedTaskAttempt();
+      result = new ParsedTaskAttempt();
       result.setAttemptID(taskAttemptName);
       attempts.put(taskAttemptName, result);
       task.getAttempts().add(result);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java
Fri Jan 20 19:30:11 2012
@@ -18,10 +18,15 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounter;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 
 /**
@@ -143,4 +148,21 @@ public class JobHistoryUtils {
     String jobId = extractJobIDFromConfFileName(fileName);
     return jobId != null;
   }
+
+  /**
+   * Extract/Add counters into the Map from the given JhCounters object.
+   * @param counters the counters to be extracted from
+   * @return the map of counters
+   */
+  static Map<String, Long> extractCounters(JhCounters counters) {
+    Map<String, Long> countersMap = new HashMap<String, Long>();
+    if (counters != null) {
+      for (JhCounterGroup group : counters.groups) {
+        for (JhCounter counter : group.counts) {
+          countersMap.put(counter.name.toString(), counter.value);
+        }
+      }
+    }
+    return countersMap;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=1234086&r1=1234085&r2=1234086&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
Fri Jan 20 19:30:11 2012
@@ -360,6 +360,10 @@ public class LoggedJob implements DeepCo
     this.relativeTime = relativeTime;
   }
 
+  /**
+   * @return job queue name if it is available in job history file or
+   *         job history conf file. Returns null otherwise.
+   */
   public QueueName getQueue() {
     return queue;
   }

Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java?rev=1234086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java
(added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java
Fri Jan 20 19:30:11 2012
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * This is a wrapper class around {@link LoggedJob}. This provides also the
+ * extra information about the job obtained from job history which is not
+ * written to the JSON trace file.
+ */
+public class ParsedJob extends LoggedJob {
+
+  private static final Log LOG = LogFactory.getLog(ParsedJob.class);
+
+  private Map<String, Long> totalCountersMap = new HashMap<String, Long>();
+  private Map<String, Long> mapCountersMap = new HashMap<String, Long>();
+  private Map<String, Long> reduceCountersMap = new HashMap<String, Long>();
+
+  private String jobConfPath;
+  private Map<JobACL, AccessControlList> jobAcls;
+
+  ParsedJob() {
+
+  }
+
+  ParsedJob(String jobID) {
+    super();
+
+    setJobID(jobID);
+  }
+
+  /** Set the job total counters */
+  void putTotalCounters(Map<String, Long> totalCounters) {
+    this.totalCountersMap = totalCounters;
+  }
+
+  /**
+   * @return the job total counters
+   */
+  public Map<String, Long> obtainTotalCounters() {
+    return totalCountersMap;
+  }
+
+  /** Set the job level map tasks' counters */
+  void putMapCounters(Map<String, Long> mapCounters) {
+    this.mapCountersMap = mapCounters;
+  }
+
+  /**
+   * @return the job level map tasks' counters
+   */
+  public Map<String, Long> obtainMapCounters() {
+    return mapCountersMap;
+  }
+
+  /** Set the job level reduce tasks' counters */
+  void putReduceCounters(Map<String, Long> reduceCounters) {
+    this.reduceCountersMap = reduceCounters;
+  }
+
+  /**
+   * @return the job level reduce tasks' counters
+   */
+  public Map<String, Long> obtainReduceCounters() {
+    return reduceCountersMap;
+  }
+
+  /** Set the job conf path in staging dir on hdfs */
+  void putJobConfPath(String confPath) {
+    jobConfPath = confPath;
+  }
+
+  /**
+   * @return the job conf path in staging dir on hdfs
+   */
+  public String obtainJobConfpath() {
+    return jobConfPath;
+  }
+
+  /** Set the job acls */
+  void putJobAcls(Map<JobACL, AccessControlList> acls) {
+    jobAcls = acls;
+  }
+
+  /**
+   * @return the job acls
+   */
+  public Map<JobACL, AccessControlList> obtainJobAcls() {
+    return jobAcls;
+  }
+
+  /**
+   * @return the list of map tasks of this job
+   */
+  public List<ParsedTask> obtainMapTasks() {
+    List<LoggedTask> tasks = super.getMapTasks();
+    return convertTasks(tasks);
+  }
+
+  /**
+   * @return the list of reduce tasks of this job
+   */
+  public List<ParsedTask> obtainReduceTasks() {
+    List<LoggedTask> tasks = super.getReduceTasks();
+    return convertTasks(tasks);
+  }
+
+  /**
+   * @return the list of other tasks of this job
+   */
+  public List<ParsedTask> obtainOtherTasks() {
+    List<LoggedTask> tasks = super.getOtherTasks();
+    return convertTasks(tasks);
+  }
+
+  /** As we know that this list of {@link LoggedTask} objects is actually a list
+   * of {@link ParsedTask} objects, we go ahead and cast them.
+   * @return the list of {@link ParsedTask} objects
+   */
+  private List<ParsedTask> convertTasks(List<LoggedTask> tasks) {
+    List<ParsedTask> result = new ArrayList<ParsedTask>();
+
+    for (LoggedTask t : tasks) {
+      if (t instanceof ParsedTask) {
+        result.add((ParsedTask)t);
+      } else {
+        throw new RuntimeException("Unexpected type of tasks in the list...");
+      }
+    }
+    return result;
+  }
+
+  /** Dump the extra info of ParsedJob */
+  void dumpParsedJob() {
+    LOG.info("ParsedJob details:" + obtainTotalCounters() + ";"
+        + obtainMapCounters() + ";" + obtainReduceCounters()
+        + "\n" + obtainJobConfpath() + "\n" + obtainJobAcls()
+        + ";Q=" + (getQueue() == null ? "null" : getQueue().getValue()));
+    List<ParsedTask> maps = obtainMapTasks();
+    for (ParsedTask task : maps) {
+      task.dumpParsedTask();
+    }
+    List<ParsedTask> reduces = obtainReduceTasks();
+    for (ParsedTask task : reduces) {
+      task.dumpParsedTask();
+    }
+    List<ParsedTask> others = obtainOtherTasks();
+    for (ParsedTask task : others) {
+      task.dumpParsedTask();
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java?rev=1234086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java
(added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java
Fri Jan 20 19:30:11 2012
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
+
+/**
+ * This is a wrapper class around {@link LoggedTask}. This provides also the
+ * extra information about the task obtained from job history which is not
+ * written to the JSON trace file.
+ */
+public class ParsedTask extends LoggedTask {
+
+  private static final Log LOG = LogFactory.getLog(ParsedTask.class);
+
+  private String diagnosticInfo;
+  private String failedDueToAttempt;
+  private Map<String, Long> countersMap = new HashMap<String, Long>();
+
+  ParsedTask() {
+    super();
+  }
+
+  public void incorporateCounters(JhCounters counters) {
+    Map<String, Long> countersMap =
+        JobHistoryUtils.extractCounters(counters);
+    putCounters(countersMap);
+
+    super.incorporateCounters(counters);
+  }
+
+  /** Set the task counters */
+  public void putCounters(Map<String, Long> counters) {
+    this.countersMap = counters;
+  }
+
+  /**
+   * @return the task counters
+   */
+  public Map<String, Long> obtainCounters() {
+    return countersMap;
+  }
+
+  /** Set the task diagnostic-info */
+  public void putDiagnosticInfo(String msg) {
+    diagnosticInfo = msg;
+  }
+
+  /**
+   * @return the diagnostic-info of this task.
+   *         If the task is successful, returns null.
+   */
+  public String obtainDiagnosticInfo() {
+    return diagnosticInfo;
+  }
+
+  /**
+   * Set the failed-due-to-attemptId info of this task.
+   */
+  public void putFailedDueToAttemptId(String attempt) {
+    failedDueToAttempt = attempt;
+  }
+
+  /**
+   * @return the failed-due-to-attemptId info of this task.
+   *         If the task is successful, returns null.
+   */
+  public String obtainFailedDueToAttemptId() {
+    return failedDueToAttempt;
+  }
+
+  List<ParsedTaskAttempt> obtainTaskAttempts() {
+    List<LoggedTaskAttempt> attempts = getAttempts();
+    return convertTaskAttempts(attempts);
+  }
+
+  List<ParsedTaskAttempt> convertTaskAttempts(
+      List<LoggedTaskAttempt> attempts) {
+    List<ParsedTaskAttempt> result = new ArrayList<ParsedTaskAttempt>();
+
+    for (LoggedTaskAttempt t : attempts) {
+      if (t instanceof ParsedTaskAttempt) {
+        result.add((ParsedTaskAttempt)t);
+      } else {
+        throw new RuntimeException(
+            "Unexpected type of taskAttempts in the list...");
+      }
+    }
+    return result;
+  }
+
+  /** Dump the extra info of ParsedTask */
+  void dumpParsedTask() {
+    LOG.info("ParsedTask details:" + obtainCounters()
+        + "\n" + obtainFailedDueToAttemptId()
+        + "\nPreferred Locations are:");
+    List<LoggedLocation> loc = getPreferredLocations();
+    for (LoggedLocation l : loc) {
+      LOG.info(l.getLayers() + ";" + l.toString());
+    }
+    List<ParsedTaskAttempt> attempts = obtainTaskAttempts();
+    for (ParsedTaskAttempt attempt : attempts) {
+      attempt.dumpParsedTaskAttempt();
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java?rev=1234086&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java
(added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java
Fri Jan 20 19:30:11 2012
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.jobhistory.JhCounters;
+
+/**
+ * This is a wrapper class around {@link LoggedTaskAttempt}. This provides
+ * also the extra information about the task attempt obtained from
+ * job history which is not written to the JSON trace file.
+ */
+public class ParsedTaskAttempt extends LoggedTaskAttempt {
+
+  private static final Log LOG = LogFactory.getLog(ParsedTaskAttempt.class);
+
+  private String diagnosticInfo;
+  private String trackerName;
+  private Integer httpPort, shufflePort;
+  private Map<String, Long> countersMap = new HashMap<String, Long>();
+
+  ParsedTaskAttempt() {
+    super();
+  }
+
+  /** incorporate event counters */
+  public void incorporateCounters(JhCounters counters) {
+
+    Map<String, Long> countersMap =
+      JobHistoryUtils.extractCounters(counters);
+    putCounters(countersMap);
+
+    super.incorporateCounters(counters);
+  }
+
+  /** Set the task attempt counters */
+  public void putCounters(Map<String, Long> counters) {
+    this.countersMap = counters;
+  }
+
+  /**
+   * @return the task attempt counters
+   */
+  public Map<String, Long> obtainCounters() {
+    return countersMap;
+  }
+
+  /** Set the task attempt diagnostic-info */
+  public void putDiagnosticInfo(String msg) {
+    diagnosticInfo = msg;
+  }
+
+  /**
+   * @return the diagnostic-info of this task attempt.
+   *         If the attempt is successful, returns null.
+   */
+  public String obtainDiagnosticInfo() {
+    return diagnosticInfo;
+  }
+
+  void putTrackerName(String trackerName) {
+    this.trackerName = trackerName;
+  }
+
+  public String obtainTrackerName() {
+    return trackerName;
+  }
+
+  void putHttpPort(int port) {
+    httpPort = port;
+  }
+
+  /**
+   * @return http port if set. Returns null otherwise.
+   */
+  public Integer obtainHttpPort() {
+    return httpPort;
+  }
+
+  void putShufflePort(int port) {
+    shufflePort = port;
+  }
+
+  /**
+   * @return shuffle port if set. Returns null otherwise.
+   */
+  public Integer obtainShufflePort() {
+    return shufflePort;
+  }
+
+  /** Dump the extra info of ParsedTaskAttempt */
+  void dumpParsedTaskAttempt() {
+    LOG.info("ParsedTaskAttempt details:" + obtainCounters()
+        + ";DiagnosticInfo=" + obtainDiagnosticInfo() + "\n"
+        + obtainTrackerName() + ";" + obtainHttpPort() + ";"
+        + obtainShufflePort() + ";rack=" + getHostName().getRackName()
+        + ";host=" + getHostName().getHostName());
+  }
+}



Mime
View raw message