hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject svn commit: r1185694 [7/7] - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/ src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/emulators/ src/contrib/gridmix/sr...
Date Tue, 18 Oct 2011 14:45:51 GMT
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Oct 18 14:45:48 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.io.BufferedInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -46,6 +47,7 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.TraceBuilder.MyOptions;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -363,6 +365,160 @@ public class TestRumenJobTraces {
   }
 
   /**
+   * Check if processing of input arguments is as expected by passing globbed
+   * input path
+   * <li> without -recursive option and
+   * <li> with -recursive option.
+   */
+  @Test
+  public void testProcessInputArgument() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    // define the test's root temporary directory
+    final Path rootTempDir =
+      new Path(System.getProperty("test.build.data", "/tmp"))
+          .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    // define the test's root input directory
+    Path testRootInputDir = new Path(rootTempDir, "TestProcessInputArgument");
+    // define the nested input directory
+    Path nestedInputDir = new Path(testRootInputDir, "1/2/3/4");
+    // define the globbed version of the nested input directory
+    Path globbedInputNestedDir =
+      lfs.makeQualified(new Path(testRootInputDir, "*/*/*/*/*"));
+    try {
+      lfs.delete(nestedInputDir, true);
+
+      List<String> recursiveInputPaths = new ArrayList<String>();
+      List<String> nonRecursiveInputPaths = new ArrayList<String>();
+      // Create input files under the given path with multiple levels of
+      // sub directories
+      createHistoryLogsHierarchy(nestedInputDir, lfs, recursiveInputPaths,
+          nonRecursiveInputPaths);
+
+      // Check the case of globbed input path and without -recursive option
+      List<Path> inputs = MyOptions.processInputArgument(
+                              globbedInputNestedDir.toString(), conf, false);
+      validateHistoryLogPaths(inputs, nonRecursiveInputPaths);
+   // Check the case of globbed input path and with -recursive option
+      inputs = MyOptions.processInputArgument(
+                   globbedInputNestedDir.toString(), conf, true);
+      validateHistoryLogPaths(inputs, recursiveInputPaths);
+
+    } finally {
+      lfs.delete(testRootInputDir, true);
+    }
+  }
+
+  /**
+   * Validate if the input history log paths are as expected.
+   * @param inputs  the resultant input paths to be validated
+   * @param expectedHistoryFileNames  the expected input history logs
+   * @throws IOException
+   */
+  private void validateHistoryLogPaths(List<Path> inputs,
+      List<String> expectedHistoryFileNames) throws IOException {
+
+    System.out.println("\nExpected history files are:");
+    for (String historyFile : expectedHistoryFileNames) {
+      System.out.println(historyFile);
+    }
+    System.out.println("\nResultant history files are:");
+    List<String> historyLogs = new ArrayList<String>();
+    for (Path p : inputs) {
+      historyLogs.add(p.toUri().getPath());
+      System.out.println(p.toUri().getPath());
+    }
+
+    assertEquals("Number of history logs found is different from the expected.",
+        expectedHistoryFileNames.size(), inputs.size());
+
+    // Verify if all the history logs are expected ones and they are in the
+    // expected order
+    assertTrue("Some of the history log files do not match the expected.",
+        historyLogs.equals(expectedHistoryFileNames));
+  }
+
+  /**
+   * Create history logs under the given path with multiple levels of
+   * sub directories as shown below.
+   * <br>
+   * Create a file, an empty subdirectory and a nonempty subdirectory
+   * &lt;historyDir&gt; under the given input path.
+   * <br>
+   * The subdirectory &lt;historyDir&gt; contains the following dir structure:
+   * <br>
+   * <br>&lt;historyDir&gt;/historyFile1.txt
+   * <br>&lt;historyDir&gt;/historyFile1.gz
+   * <br>&lt;historyDir&gt;/subDir1/historyFile2.txt
+   * <br>&lt;historyDir&gt;/subDir1/historyFile2.gz
+   * <br>&lt;historyDir&gt;/subDir2/historyFile3.txt
+   * <br>&lt;historyDir&gt;/subDir2/historyFile3.gz
+   * <br>&lt;historyDir&gt;/subDir1/subDir11/historyFile4.txt
+   * <br>&lt;historyDir&gt;/subDir1/subDir11/historyFile4.gz
+   * <br>&lt;historyDir&gt;/subDir2/subDir21/
+   * <br>
+   * Create the lists of input paths that should be processed by TraceBuilder
+   * for recursive case and non-recursive case.
+   * @param nestedInputDir the input history logs directory where history files
+   *                       with nested subdirectories are created
+   * @param fs         FileSystem of the input paths
+   * @param recursiveInputPaths input paths for recursive case
+   * @param nonRecursiveInputPaths input paths for non-recursive case
+   * @throws IOException
+   */
+  private void createHistoryLogsHierarchy(Path nestedInputDir, FileSystem fs,
+      List<String> recursiveInputPaths, List<String> nonRecursiveInputPaths)
+  throws IOException {
+    List<Path> dirs = new ArrayList<Path>();
+    // define a file in the nested test input directory
+    Path inputPath1 = new Path(nestedInputDir, "historyFile.txt");
+    // define an empty sub-folder in the nested test input directory
+    Path emptyDir = new Path(nestedInputDir, "emptyDir");
+    // define a nonempty sub-folder in the nested test input directory
+    Path historyDir = new Path(nestedInputDir, "historyDir");
+
+    fs.mkdirs(nestedInputDir);
+    // Create an empty input file
+    fs.createNewFile(inputPath1);
+    // Create empty subdir
+    fs.mkdirs(emptyDir);// let us not create any files under this dir
+
+    fs.mkdirs(historyDir);
+    dirs.add(historyDir);
+
+    Path subDir1 = new Path(historyDir, "subDir1");
+    fs.mkdirs(subDir1);
+    dirs.add(subDir1);
+    Path subDir2 = new Path(historyDir, "subDir2");
+    fs.mkdirs(subDir2);
+    dirs.add(subDir2);
+
+    Path subDir11 = new Path(subDir1, "subDir11");
+    fs.mkdirs(subDir11);
+    dirs.add(subDir11);
+    Path subDir21 = new Path(subDir2, "subDir21");
+    fs.mkdirs(subDir21);// let us not create any files under this dir
+
+    int i = 0;
+    for (Path dir : dirs) {
+      i++;
+      Path gzPath = new Path(dir, "historyFile" + i + ".gz");
+      Path txtPath = new Path(dir, "historyFile" + i + ".txt");
+      fs.createNewFile(txtPath);
+      fs.createNewFile(gzPath);
+      recursiveInputPaths.add(gzPath.toUri().getPath());
+      recursiveInputPaths.add(txtPath.toUri().getPath());
+      if (i == 1) {
+        nonRecursiveInputPaths.add(gzPath.toUri().getPath());
+        nonRecursiveInputPaths.add(txtPath.toUri().getPath());
+      }
+    }
+    recursiveInputPaths.add(inputPath1.toUri().getPath());
+    nonRecursiveInputPaths.add(inputPath1.toUri().getPath());
+  }
+
+    /**
    * Test if {@link CurrentJHParser} can read events from current JH files.
    */
   @Test
@@ -426,7 +582,7 @@ public class TestRumenJobTraces {
 
       // Test if the JobHistoryParserFactory can detect the parser correctly
       parser = JobHistoryParserFactory.getParser(ris);
-        
+
       HistoryEvent e;
       while ((e = parser.nextEvent()) != null) {
         String eventString = e.getEventType().toString();
@@ -470,71 +626,267 @@ public class TestRumenJobTraces {
     }
   }
   
-  @Test
-  public void testJobConfigurationParser() throws Exception {
-    String[] list1 =
-        { "mapred.job.queue.name", "mapreduce.job.name",
-            "mapred.child.java.opts" };
+    /**
+     * Test if the {@link JobConfigurationParser} can correctly extract out 
+     * key-value pairs from the job configuration.
+     */
+    @Test
+    public void testJobConfigurationParsing() throws Exception {
+      final FileSystem lfs = FileSystem.getLocal(new Configuration());
+  
+      final Path rootTempDir =
+          new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+              lfs.getUri(), lfs.getWorkingDirectory());
+  
+      final Path tempDir = new Path(rootTempDir, "TestJobConfigurationParser");
+      lfs.delete(tempDir, true);
+  
+      // Add some configuration parameters to the conf
+      JobConf jConf = new JobConf(false);
+      String key = "test.data";
+      String value = "hello world";
+      jConf.set(key, value);
+      
+      // create the job conf file
+      Path jobConfPath = new Path(tempDir.toString(), "job.xml");
+      lfs.delete(jobConfPath, false);
+      DataOutputStream jobConfStream = lfs.create(jobConfPath);
+      jConf.writeXml(jobConfStream);
+      jobConfStream.close();
+      
+      // now read the job conf file using the job configuration parser
+      Properties properties = 
+        JobConfigurationParser.parse(lfs.open(jobConfPath));
+      
+      // check if the required parameter is loaded
+      assertEquals("Total number of extracted properties (" + properties.size() 
+                   + ") doesn't match the expected size of 1 ["
+                   + "JobConfigurationParser]",
+                   1, properties.size());
+      // check if the key is present in the extracted configuration
+      assertTrue("Key " + key + " is missing in the configuration extracted "
+                 + "[JobConfigurationParser]",
+                 properties.keySet().contains(key));
+      // check if the desired property has the correct value
+      assertEquals("JobConfigurationParser couldn't recover the parameters"
+                   + " correctly",
+                  value, properties.get(key));
+      
+      // Test ZombieJob
+      LoggedJob job = new LoggedJob();
+      job.setJobProperties(properties);
+      
+      ZombieJob zjob = new ZombieJob(job, null);
+      Configuration zconf = zjob.getJobConf();
+      // check if the required parameter is loaded
+      assertEquals("ZombieJob couldn't recover the parameters correctly", 
+                   value, zconf.get(key));
+    }
 
-    String[] list2 = { "mapred.job.queue.name", "mapred.child.java.opts" };
+    @Test
+    public void testJobConfigurationParser() throws Exception {
 
-    List<String> interested1 = new ArrayList<String>();
-    for (String interested : list1) {
-      interested1.add(interested);
+      // Validate parser with old mapred config properties from
+      // sample-conf-file.xml
+      validateJobConfParser("sample-conf.file.xml");
     }
 
-    List<String> interested2 = new ArrayList<String>();
-    for (String interested : list2) {
-      interested2.add(interested);
-    }
+    private void validateJobConfParser(String confFile) throws Exception {
 
-    JobConfigurationParser jcp1 = new JobConfigurationParser(interested1);
-    JobConfigurationParser jcp2 = new JobConfigurationParser(interested2);
+      final Configuration conf = new Configuration();
+      final FileSystem lfs = FileSystem.getLocal(conf);
 
-    final Configuration conf = new Configuration();
-    final FileSystem lfs = FileSystem.getLocal(conf);
+      @SuppressWarnings("deprecation")
+      final Path rootInputDir =
+          new Path(System.getProperty("test.tools.input.dir", ""))
+              .makeQualified(lfs);
 
-    @SuppressWarnings("deprecation")
-    final Path rootInputDir =
-        new Path(System.getProperty("test.tools.input.dir", ""))
-            .makeQualified(lfs);
+      final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
 
-    final Path rootInputPath = new Path(rootInputDir, "rumen/small-trace-test");
+      final Path inputPath = new Path(rootInputPath, confFile);
 
-    final Path inputPath = new Path(rootInputPath, "sample-conf.file.xml");
+      InputStream inputConfStream =
+          new PossiblyDecompressedInputStream(inputPath, conf);
 
-    InputStream inputConfStream =
-        new PossiblyDecompressedInputStream(inputPath, conf);
+      try {
+        Properties props = JobConfigurationParser.parse(inputConfStream);
+        inputConfStream.close();
 
-    try {
-      Properties props1 = jcp1.parse(inputConfStream);
-      inputConfStream.close();
+        // Make sure that parser puts the interested properties into props1 and
+        // props2 as defined by list1 and list2.
+        assertEquals("Config property for job queue name is not "
+            + " extracted properly.", "TheQueue",
+            JobBuilder.extract(props, JobConfPropertyNames.QUEUE_NAMES
+            .getCandidates(), null));
+        assertEquals("Config property for job name is not "
+            + " extracted properly.", "MyMRJob",
+            JobBuilder.extract(props, JobConfPropertyNames.JOB_NAMES
+            .getCandidates(), null));
 
-      inputConfStream = new PossiblyDecompressedInputStream(inputPath, conf);
-      Properties props2 = jcp2.parse(inputConfStream);
+        validateChildJavaOpts(props);
 
-      assertEquals("testJobConfigurationParser: wrong number of properties", 3,
-          props1.size());
-      assertEquals("testJobConfigurationParser: wrong number of properties", 2,
-          props2.size());
-
-      assertEquals("prop test 1", "TheQueue", props1
-          .get("mapred.job.queue.name"));
-      assertEquals("prop test 2", "job_0001", props1.get("mapreduce.job.name"));
-      assertEquals("prop test 3",
-          "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props1
-              .get("mapred.child.java.opts"));
-      assertEquals("prop test 4", "TheQueue", props2
-          .get("mapred.job.queue.name"));
-      assertEquals("prop test 5",
-          "-server -Xmx640m -Djava.net.preferIPv4Stack=true", props2
-              .get("mapred.child.java.opts"));
+      } finally {
+        inputConfStream.close();
+      }
+    }
+    
+    // Validate child java opts in properties.
+    private void validateChildJavaOpts(Properties props) {
+      // if old property mapred.child.java.opts is set, then extraction of all
+      // the following 3 properties should give that value.
+      assertEquals("mapred.child.java.opts is not extracted properly.",
+          "-server -Xmx640m -Djava.net.preferIPv4Stack=true",
+          JobBuilder.extract(props, JobConfPropertyNames.TASK_JAVA_OPTS_S
+          .getCandidates(), null));
+      assertEquals("New config property " + JobConf.MAPRED_MAP_TASK_JAVA_OPTS
+          + " is not extracted properly when the old config property "
+          + "mapred.child.java.opts is set.",
+          "-server -Xmx640m -Djava.net.preferIPv4Stack=true",
+          JobBuilder.extract(props, JobConfPropertyNames.MAP_JAVA_OPTS_S
+          .getCandidates(), null));
+      assertEquals("New config property " + JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS
+          + " is not extracted properly when the old config property "
+          + "mapred.child.java.opts is set.",
+          "-server -Xmx640m -Djava.net.preferIPv4Stack=true",
+          JobBuilder.extract(props, JobConfPropertyNames.REDUCE_JAVA_OPTS_S
+          .getCandidates(), null));
+    }
 
-    } finally {
-      inputConfStream.close();
+  /**
+   * Test {@link ResourceUsageMetrics}.
+   */
+  @Test
+  public void testResourceUsageMetrics() throws Exception {
+    final long cpuUsage = 100;
+    final long pMemUsage = 200;
+    final long vMemUsage = 300;
+    final long heapUsage = 400;
+    
+    // test ResourceUsageMetrics's setters
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+    metrics.setCumulativeCpuUsage(cpuUsage);
+    metrics.setPhysicalMemoryUsage(pMemUsage);
+    metrics.setVirtualMemoryUsage(vMemUsage);
+    metrics.setHeapUsage(heapUsage);
+    // test cpu usage value
+    assertEquals("Cpu usage values mismatch via set", cpuUsage, 
+                 metrics.getCumulativeCpuUsage());
+    // test pMem usage value
+    assertEquals("Physical memory usage values mismatch via set", pMemUsage, 
+                 metrics.getPhysicalMemoryUsage());
+    // test vMem usage value
+    assertEquals("Virtual memory usage values mismatch via set", vMemUsage, 
+                 metrics.getVirtualMemoryUsage());
+    // test heap usage value
+    assertEquals("Heap usage values mismatch via set", heapUsage, 
+                 metrics.getHeapUsage());
+    
+    // test deepCompare() (pass case)
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 
+                                          pMemUsage, heapUsage, true);
+    
+    // test deepCompare (fail case)
+    // test cpu usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, 0, vMemUsage, pMemUsage, 
+                                          heapUsage, false);
+    // test pMem usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 0, 
+                                          heapUsage, false);
+    // test vMem usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, 0, pMemUsage, 
+                                          heapUsage, false);
+    // test heap usage mismatch
+    testResourceUsageMetricViaDeepCompare(metrics, cpuUsage, vMemUsage, 
+                                          pMemUsage, 0, false);
+    
+    // define a metric with a fixed value of size()
+    ResourceUsageMetrics metrics2 = new ResourceUsageMetrics() {
+      @Override
+      public int size() {
+        return -1;
+      }
+    };
+    metrics2.setCumulativeCpuUsage(cpuUsage);
+    metrics2.setPhysicalMemoryUsage(pMemUsage);
+    metrics2.setVirtualMemoryUsage(vMemUsage);
+    metrics2.setHeapUsage(heapUsage);
+    
+    // test with size mismatch
+    testResourceUsageMetricViaDeepCompare(metrics2, cpuUsage, vMemUsage, 
+                                          pMemUsage, heapUsage, false);
+  }
+  
+  // test ResourceUsageMetric's deepCompare() method
+  private static void testResourceUsageMetricViaDeepCompare(
+                        ResourceUsageMetrics metrics, long cpuUsage, 
+                        long vMemUsage, long pMemUsage, long heapUsage,
+                        boolean shouldPass) {
+    ResourceUsageMetrics testMetrics = new ResourceUsageMetrics();
+    testMetrics.setCumulativeCpuUsage(cpuUsage);
+    testMetrics.setPhysicalMemoryUsage(pMemUsage);
+    testMetrics.setVirtualMemoryUsage(vMemUsage);
+    testMetrics.setHeapUsage(heapUsage);
+    
+    Boolean passed = null;
+    try {
+      metrics.deepCompare(testMetrics, new TreePath(null, "<root>"));
+      passed = true;
+    } catch (DeepInequalityException die) {
+      passed = false;
     }
+    
+    assertEquals("ResourceUsageMetrics deepCompare() failed!", 
+                 shouldPass, passed);
   }
-
+  
+  /**
+   * Testing {@link ResourceUsageMetrics} using {@link HadoopLogsAnalyzer}.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testResourceUsageMetricsWithHadoopLogsAnalyzer() 
+  throws IOException {
+    Configuration conf = new Configuration();
+    // get the input trace file
+    Path rootInputDir =
+      new Path(System.getProperty("test.tools.input.dir", ""));
+    Path rootInputSubFolder = new Path(rootInputDir, "rumen/small-trace-test");
+    Path traceFile = new Path(rootInputSubFolder, "v20-resource-usage-log.gz");
+    
+    FileSystem lfs = FileSystem.getLocal(conf);
+    
+    // define the root test directory
+    Path rootTempDir =
+        new Path(System.getProperty("test.build.data", "/tmp"));
+
+    // define output directory
+    Path outputDir = 
+      new Path(rootTempDir, "testResourceUsageMetricsWithHadoopLogsAnalyzer");
+    lfs.delete(outputDir, true);
+    lfs.deleteOnExit(outputDir);
+    
+    // run HadoopLogsAnalyzer
+    HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
+    analyzer.setConf(conf);
+    Path traceOutput = new Path(outputDir, "trace.json");
+    analyzer.run(new String[] {"-write-job-trace", traceOutput.toString(), 
+                               "-v1", traceFile.toString()});
+    
+    // test HadoopLogsAnalyzer's output w.r.t ResourceUsageMetrics
+    //  get the logged job
+    JsonObjectMapperParser<LoggedJob> traceParser =
+      new JsonObjectMapperParser<LoggedJob>(traceOutput, LoggedJob.class, 
+                                            conf);
+    
+    //  get the logged job from the output trace file
+    LoggedJob job = traceParser.getNext();
+    LoggedTaskAttempt attempt = job.getMapTasks().get(0).getAttempts().get(0);
+    ResourceUsageMetrics metrics = attempt.getResourceUsageMetrics();
+    
+    //  test via deepCompare()
+    testResourceUsageMetricViaDeepCompare(metrics, 200, 100, 75, 50, true);
+  }
+  
   @Test
   public void testTopologyBuilder() throws Exception {
     final TopologyBuilder subject = new TopologyBuilder();

Modified: hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/job-tracker-logs-trace-output.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/job-tracker-logs-trace-output.gz?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/sample-conf.file.xml Tue Oct 18 14:45:48 2011
@@ -19,11 +19,14 @@
  */
 -->
 <configuration>
+<!--
+Old mapred config properties
+-->
    <property>
       <name>mapred.job.queue.name</name><value>TheQueue</value>
    </property>
    <property>
-      <name>mapreduce.job.name</name><value>job_0001</value>
+      <name>mapred.job.name</name><value>MyMRJob</value>
    </property>
    <property>
       <name>maproduce.uninteresting.property</name><value>abcdef</value>

Modified: hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/truncated-trace-output
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/truncated-trace-output?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/truncated-trace-output (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/truncated-trace-output Tue Oct 18 14:45:48 2011
@@ -3,6 +3,9 @@
   "user" : "hadoopqa",
   "jobName" : null,
   "jobID" : "job_200904211745_0002",
+   "jobProperties" : {
+    "mapred.child.java.opts" : "-server -Xmx640m -Djava.net.preferIPv4Stack=true"
+  },
   "mapTasks" : [ {
     "startTime" : 1240336753705,
     "attempts" : [ {

Added: hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz?rev=1185694&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/common/branches/branch-0.20-security/src/test/tools/data/rumen/small-trace-test/v20-resource-usage-log.gz
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java Tue Oct 18 14:45:48 2011
@@ -54,13 +54,13 @@ public interface ClusterStory {
   /**
    * Get {@link MachineNode} by its host name.
    * 
-   * @return The {@line MachineNode} with the same name. Or null if not found.
+   * @return The {@link MachineNode} with the same name. Or null if not found.
    */
   public MachineNode getMachineByName(String name);
   
   /**
    * Get {@link RackNode} by its name.
-   * @return The {@line RackNode} with the same name. Or null if not found.
+   * @return The {@link RackNode} with the same name. Or null if not found.
    */
   public RackNode getRackByName(String name);
 

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/DeskewedJobTraceReader.java Tue Oct 18 14:45:48 2011
@@ -72,7 +72,7 @@ public class DeskewedJobTraceReader impl
    * 
    * @param reader
    *          the {@link JobTraceReader} that's being protected
-   * @param skewBufferSize
+   * @param skewBufferLength
    *          [the number of late jobs that can preced a later out-of-order
    *          earlier job
    * @throws IOException

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Tue Oct 18 14:45:48 2011
@@ -1208,6 +1208,38 @@ public class HadoopLogsAnalyzer extends 
         attempt.spilledRecords = val;
       }
     }, counterString, "SPILLED_RECORDS");
+    
+    // incorporate CPU usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setCumulativeCpuUsage(val);
+      }
+    }, counterString, "CPU_MILLISECONDS");
+    
+    // incorporate virtual memory usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setVirtualMemoryUsage(val);
+      }
+    }, counterString, "VIRTUAL_MEMORY_BYTES");
+    
+    // incorporate physical memory usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setPhysicalMemoryUsage(val);
+      }
+    }, counterString, "PHYSICAL_MEMORY_BYTES");
+    
+    // incorporate heap usage
+    incorporateCounter(new SetField(attempt2) {
+      @Override
+      void set(long val) {
+        attempt.getResourceUsageMetrics().setHeapUsage(val);
+      }
+    }, counterString, "COMMITTED_HEAP_BYTES");
   }
 
   private ParsedHost getAndRecordParsedHost(String hostName) {
@@ -1594,6 +1626,8 @@ public class HadoopLogsAnalyzer extends 
       jobBeingTraced.setJobMapMB(jobconf.jobMapMB);
       jobBeingTraced.setJobReduceMB(jobconf.jobReduceMB);
 
+      jobBeingTraced.setJobProperties(jobconf.properties);
+      
       jobconf = null;
 
       finalizeJob();

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Tue Oct 18 14:45:48 2011
@@ -74,6 +74,8 @@ public class JobBuilder {
   private static final Pattern heapPattern =
       Pattern.compile("-Xmx([0-9]+[kKmMgGtT])");
 
+  private Properties jobConfigurationParameters = null;
+
   public JobBuilder(String jobID) {
     this.jobID = jobID;
   }
@@ -142,7 +144,7 @@ public class JobBuilder {
           "JobBuilder.process(HistoryEvent): unknown event type");
   }
 
-  private String extract(Properties conf, String[] names, String defaultValue) {
+  static String extract(Properties conf, String[] names, String defaultValue) {
     for (String name : names) {
       String result = conf.getProperty(name);
 
@@ -206,6 +208,7 @@ public class JobBuilder {
           "JobBuilder.process(Properties conf) called after LoggedJob built");
     }
 
+    //TODO remove this once the deprecate APIs in LoggedJob are removed
     result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
         .getCandidates(), "default"));
     result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
@@ -217,6 +220,8 @@ public class JobBuilder {
         JobConfPropertyNames.MAP_JAVA_OPTS_S.getCandidates()));
     maybeSetJobReduceMB(extractMegabytes(conf,
         JobConfPropertyNames.REDUCE_JAVA_OPTS_S.getCandidates()));
+        
+    this.jobConfigurationParameters = conf;
   }
 
   /**
@@ -226,9 +231,12 @@ public class JobBuilder {
    * @return Parsed {@link LoggedJob} object.
    */
   public LoggedJob build() {
-    // The main job here is to build CDFs
+    // The main job here is to build CDFs and manage the conf
     finalized = true;
 
+    // set the conf
+    result.setJobProperties(jobConfigurationParameters);
+    
     // initialize all the per-job statistics gathering places
     Histogram[] successfulMapAttemptTimes =
         new Histogram[ParsedHost.numberOfDistances() + 1];

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java Tue Oct 18 14:45:48 2011
@@ -17,13 +17,9 @@
  */
 package org.apache.hadoop.tools.rumen;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -38,22 +34,11 @@ import org.xml.sax.SAXException;
 
 /**
  * {@link JobConfigurationParser} parses the job configuration xml file, and
- * extracts various framework specific properties. It parses the file using a
+ * extracts configuration properties. It parses the file using a
  * stream-parser and thus is more memory efficient. [This optimization may be
  * postponed for a future release]
  */
 public class JobConfigurationParser {
-  final private Set<String> interested;
-
-  /**
-   * Constructor
-   * 
-   * @param interested
-   *          properties we should extract from the job configuration xml.
-   */
-  public JobConfigurationParser(List<String> interested) {
-    this.interested = new HashSet<String>(interested);
-  }
 
   /**
    * Parse the job configuration file (as an input stream) and return a
@@ -66,7 +51,7 @@ public class JobConfigurationParser {
    *         configuration xml.
    * @throws IOException
    */
-  Properties parse(InputStream input) throws IOException {
+  static Properties parse(InputStream input) throws IOException {
     Properties result = new Properties();
 
     try {
@@ -117,7 +102,7 @@ public class JobConfigurationParser {
           }
         }
 
-        if (interested.contains(attr) && value != null) {
+        if (attr != null && value != null) {
           result.put(attr, value);
         }
       }

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java Tue Oct 18 14:45:48 2011
@@ -38,7 +38,7 @@ public class JobHistoryParserFactory {
     throw new IOException("No suitable parser.");
   }
 
-  enum VersionDetector {
+  public enum VersionDetector {
     Hadoop20() {
 
       @Override

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Tue Oct 18 14:45:48 2011
@@ -22,6 +22,8 @@ package org.apache.hadoop.tools.rumen;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -92,6 +94,8 @@ public class LoggedJob implements DeepCo
   double[] mapperTriesToSucceed;
   double failedMapperFraction; // !!!!!
 
+  private Properties jobProperties = new Properties();
+  
   LoggedJob() {
 
   }
@@ -102,6 +106,20 @@ public class LoggedJob implements DeepCo
     setJobID(jobID);
   }
 
+  /**
+   * Set the configuration properties of the job.
+   */
+  void setJobProperties(Properties conf) {
+    this.jobProperties = conf;
+  }
+  
+  /**
+   * Get the configuration properties of the job.
+   */
+  public Properties getJobProperties() {
+    return jobProperties;
+  }
+  
   void adjustTimes(long adjustment) {
     submitTime += adjustment;
     launchTime += adjustment;
@@ -537,6 +555,35 @@ public class LoggedJob implements DeepCo
     }
   }
 
+  private void compareJobProperties(Properties prop1, Properties prop2,
+                                    TreePath loc, String eltname) 
+  throws DeepInequalityException {
+    if (prop1 == null && prop2 == null) {
+      return;
+    }
+
+    if (prop1 == null || prop2 == null) {
+      throw new DeepInequalityException(eltname + " miscompared [null]", 
+                                        new TreePath(loc, eltname));
+    }
+
+    if (prop1.size() != prop2.size()) {
+      throw new DeepInequalityException(eltname + " miscompared [size]", 
+                                        new TreePath(loc, eltname));
+    }
+    
+    for (Map.Entry<Object, Object> entry : prop1.entrySet()) {
+      Object v1 = entry.getValue();
+      Object v2 = prop2.get(entry.getKey());
+      if (v1 == null || v2 == null || !v1.equals(v2)) {
+        throw new DeepInequalityException(
+          eltname + " miscompared for value of key : " 
+            + entry.getKey().toString(), 
+          new TreePath(loc, eltname));
+      }
+    }
+  }
+  
   public void deepCompare(DeepCompare comparand, TreePath loc)
       throws DeepInequalityException {
     if (!(comparand instanceof LoggedJob)) {
@@ -600,5 +647,9 @@ public class LoggedJob implements DeepCo
     compare1(clusterReduceMB, other.clusterReduceMB, loc, "clusterReduceMB");
     compare1(jobMapMB, other.jobMapMB, loc, "jobMapMB");
     compare1(jobReduceMB, other.jobReduceMB, loc, "jobReduceMB");
+
+    // compare the job configuration parameters
+    compareJobProperties(jobProperties, other.getJobProperties(), loc, 
+                         "JobProperties");
   }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Tue Oct 18 14:45:48 2011
@@ -64,6 +64,9 @@ public class LoggedTaskAttempt implement
 
   LoggedLocation location;
 
+  // Initialize to default object for backward compatibility
+  ResourceUsageMetrics metrics = new ResourceUsageMetrics();
+  
   LoggedTaskAttempt() {
     super();
   }
@@ -349,8 +352,50 @@ public class LoggedTaskAttempt implement
         attempt.spilledRecords = val;
       }
     }, counters, "SPILLED_RECORDS");
+    
+    // incorporate CPU usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setCumulativeCpuUsage(val);
+      }
+    }, counters, "CPU_MILLISECONDS");
+    
+    // incorporate virtual memory usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setVirtualMemoryUsage(val);
+      }
+    }, counters, "VIRTUAL_MEMORY_BYTES");
+    
+    // incorporate physical memory usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setPhysicalMemoryUsage(val);
+      }
+    }, counters, "PHYSICAL_MEMORY_BYTES");
+    
+    // incorporate heap usage
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        metrics.setHeapUsage(val);
+      }
+    }, counters, "COMMITTED_HEAP_BYTES");
   }
 
+  // Get the resource usage metrics
+  public ResourceUsageMetrics getResourceUsageMetrics() {
+    return metrics;
+  }
+  
+  // Set the resource usage metrics
+  void setResourceUsageMetrics(ResourceUsageMetrics metrics) {
+    this.metrics = metrics;
+  }
+  
   private static String canonicalizeCounterName(String nonCanonicalName) {
     String result = nonCanonicalName.toLowerCase();
 

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/Node.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/Node.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/Node.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/Node.java Tue Oct 18 14:45:48 2011
@@ -24,7 +24,7 @@ import java.util.TreeSet;
 
 /**
  * {@link Node} represents a node in the cluster topology. A node can be a
- * {@MachineNode}, or a {@link RackNode}, etc.
+ * {@link MachineNode}, or a {@link RackNode}, etc.
  */
 public class Node implements Comparable<Node> {
   private static final SortedSet<Node> EMPTY_SET = 

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Tue Oct 18 14:45:48 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.Properties;
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
@@ -55,6 +56,8 @@ class ParsedConfigFile {
   final String jobID;
 
   final boolean valid;
+  
+  final Properties properties = new Properties();
 
   private int maybeGetIntValue(String propName, String attr, String value,
       int oldValue) {
@@ -143,6 +146,8 @@ class ParsedConfigFile {
                 "true".equals(((Text) field.getFirstChild()).getData());
           }
         }
+        
+        properties.setProperty(attr, value);
 
         if ("mapred.child.java.opts".equals(attr) && value != null) {
           Matcher matcher = heapPattern.matcher(value);

Added: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ResourceUsageMetrics.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Captures the resource usage metrics.
+ */
+public class ResourceUsageMetrics implements Writable, DeepCompare  {
+  private long cumulativeCpuUsage;
+  private long virtualMemoryUsage;
+  private long physicalMemoryUsage;
+  private long heapUsage;
+  
+  public ResourceUsageMetrics() {
+  }
+  
+  /**
+   * Get the cumulative CPU usage.
+   */
+  public long getCumulativeCpuUsage() {
+    return cumulativeCpuUsage;
+  }
+  
+  /**
+   * Set the cumulative CPU usage.
+   */
+  public void setCumulativeCpuUsage(long usage) {
+    cumulativeCpuUsage = usage;
+  }
+  
+  /**
+   * Get the virtual memory usage.
+   */
+  public long getVirtualMemoryUsage() {
+    return virtualMemoryUsage;
+  }
+  
+  /**
+   * Set the virtual memory usage.
+   */
+  public void setVirtualMemoryUsage(long usage) {
+    virtualMemoryUsage = usage;
+  }
+  
+  /**
+   * Get the physical memory usage.
+   */
+  public long getPhysicalMemoryUsage() {
+    return physicalMemoryUsage;
+  }
+  
+  /**
+   * Set the physical memory usage.
+   */
+  public void setPhysicalMemoryUsage(long usage) {
+    physicalMemoryUsage = usage;
+  }
+  
+  /**
+   * Get the total heap usage.
+   */
+  public long getHeapUsage() {
+    return heapUsage;
+  }
+  
+  /**
+   * Set the total heap usage.
+   */
+  public void setHeapUsage(long usage) {
+    heapUsage = usage;
+  }
+  
+  /**
+   * Returns the size of the serialized data
+   */
+  public int size() {
+    int size = 0;
+    size += WritableUtils.getVIntSize(cumulativeCpuUsage);   // long #1
+    size += WritableUtils.getVIntSize(virtualMemoryUsage);   // long #2
+    size += WritableUtils.getVIntSize(physicalMemoryUsage);  // long #3
+    size += WritableUtils.getVIntSize(heapUsage);            // long #4
+    return size;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    cumulativeCpuUsage = WritableUtils.readVLong(in);  // long #1
+    virtualMemoryUsage = WritableUtils.readVLong(in);  // long #2
+    physicalMemoryUsage = WritableUtils.readVLong(in); // long #3
+    heapUsage = WritableUtils.readVLong(in);           // long #4
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    //TODO Write resources version no too
+    WritableUtils.writeVLong(out, cumulativeCpuUsage);  // long #1
+    WritableUtils.writeVLong(out, virtualMemoryUsage);  // long #2
+    WritableUtils.writeVLong(out, physicalMemoryUsage); // long #3
+    WritableUtils.writeVLong(out, heapUsage);           // long #4
+  }
+
+  private static void compareMetric(long m1, long m2, TreePath loc) 
+  throws DeepInequalityException {
+    if (m1 != m2) {
+      throw new DeepInequalityException("Value miscompared:" + loc.toString(), 
+                                        loc);
+    }
+  }
+  
+  private static void compareSize(ResourceUsageMetrics m1, 
+                                  ResourceUsageMetrics m2, TreePath loc) 
+  throws DeepInequalityException {
+    if (m1.size() != m2.size()) {
+      throw new DeepInequalityException("Size miscompared: " + loc.toString(), 
+                                        loc);
+    }
+  }
+  
+  @Override
+  public void deepCompare(DeepCompare other, TreePath loc)
+      throws DeepInequalityException {
+    if (!(other instanceof ResourceUsageMetrics)) {
+      throw new DeepInequalityException("Comparand has wrong type", loc);
+    }
+
+    ResourceUsageMetrics metrics2 = (ResourceUsageMetrics) other;
+    compareMetric(getCumulativeCpuUsage(), metrics2.getCumulativeCpuUsage(), 
+                  new TreePath(loc, "cumulativeCpu"));
+    compareMetric(getVirtualMemoryUsage(), metrics2.getVirtualMemoryUsage(), 
+                  new TreePath(loc, "virtualMemory"));
+    compareMetric(getPhysicalMemoryUsage(), metrics2.getPhysicalMemoryUsage(), 
+                  new TreePath(loc, "physicalMemory"));
+    compareMetric(getHeapUsage(), metrics2.getHeapUsage(), 
+                  new TreePath(loc, "heapUsage"));
+    compareSize(this, metrics2, new TreePath(loc, "size"));
+  }
+}
+

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptInfo.java Tue Oct 18 14:45:48 2011
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.tools.rumen;
 
-import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapred.TaskStatus.State;
 
 /**
@@ -38,7 +37,7 @@ public abstract class TaskAttemptInfo {
   }
 
   /**
-   * Get the final {@link TaskStatus.State} of the task-attempt.
+   * Get the final {@link State} of the task-attempt.
    * 
    * @return the final <code>State</code> of the task-attempt
    */

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java Tue Oct 18 14:45:48 2011
@@ -23,14 +23,22 @@ public class TaskInfo {
   private final long bytesOut;
   private final int recsOut;
   private final long maxMemory;
+  private final ResourceUsageMetrics metrics;
 
   public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
       long maxMemory) {
+    this(bytesIn, recsIn, bytesOut, recsOut, maxMemory, 
+         new ResourceUsageMetrics());
+  }
+  
+  public TaskInfo(long bytesIn, int recsIn, long bytesOut, int recsOut,
+                  long maxMemory, ResourceUsageMetrics metrics) {
     this.bytesIn = bytesIn;
     this.recsIn = recsIn;
     this.bytesOut = bytesOut;
     this.recsOut = recsOut;
     this.maxMemory = maxMemory;
+    this.metrics = metrics;
   }
 
   /**
@@ -70,4 +78,10 @@ public class TaskInfo {
     return maxMemory;
   }
 
+  /**
+   * @return Resource usage metrics
+   */
+  public ResourceUsageMetrics getResourceUsageMetrics() {
+    return metrics;
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Tue Oct 18 14:45:48 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configured
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobHistory;
 import org.apache.hadoop.util.Tool;
@@ -49,7 +51,6 @@ public class TraceBuilder extends Config
   static final int RUN_METHOD_FAILED_EXIT_CODE = 3;
 
   TopologyBuilder topologyBuilder = new TopologyBuilder();
-  JobConfigurationParser jobConfParser;
   Outputter<LoggedJob> traceWriter;
   Outputter<LoggedNetworkTopology> topologyWriter;
 
@@ -67,48 +68,136 @@ public class TraceBuilder extends Config
         IOException, ClassNotFoundException {
       int switchTop = 0;
 
+      // to determine if the input paths should be recursively scanned or not
+      boolean doRecursiveTraversal = false;
+
       while (args[switchTop].startsWith("-")) {
         if (args[switchTop].equalsIgnoreCase("-demuxer")) {
           inputDemuxerClass =
-              Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
-
-          ++switchTop;
+            Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
+        } else if (args[switchTop].equalsIgnoreCase("-recursive")) {
+          doRecursiveTraversal = true;
         }
+        ++switchTop;
       }
 
       traceOutput = new Path(args[0 + switchTop]);
       topologyOutput = new Path(args[1 + switchTop]);
 
       for (int i = 2 + switchTop; i < args.length; ++i) {
+        inputs.addAll(processInputArgument(
+            args[i], conf, doRecursiveTraversal));
+      }
+    }
 
-        Path thisPath = new Path(args[i]);
-
-        FileSystem fs = thisPath.getFileSystem(conf);
-        if (fs.getFileStatus(thisPath).isDir()) {
-          FileStatus[] statuses = fs.listStatus(thisPath);
-
-          List<String> dirNames = new ArrayList<String>();
+    /**
+     * Compare the history file names, not the full paths.
+     * Job history file name format is such that doing lexicographic sort on the
+     * history file names should result in the order of jobs' submission times.
+     */
+    private static class HistoryLogsComparator
+    implements Comparator<FileStatus> {
+      @Override
+      public int compare(FileStatus file1, FileStatus file2) {
+        return file1.getPath().getName().compareTo(
+            file2.getPath().getName());
+      }
+    }
 
-          for (FileStatus s : statuses) {
-            if (s.isDir()) continue;
-            String name = s.getPath().getName();
+    private static class InputFilter implements PathFilter {
+      public boolean accept(Path path) {
+        return !(path.getName().endsWith(".crc")
+                 || path.getName().startsWith("."));
+      }
+    }
 
-            if (!(name.endsWith(".crc") || name.startsWith("."))) {
-              dirNames.add(name);
+    /**
+     * List files (possibly recursively) and get their statuses.
+     * @param path The path of the file/dir for which ls is to be done
+     * @param fs FileSystem of the path
+     * @param filter the user-supplied path filter
+     * @return the list of file statuses under the given path
+     */
+    static List<FileStatus> listFiles(Path path, FileSystem fs,
+        PathFilter filter, boolean isRecursive) throws IOException {
+      List<FileStatus> list = new ArrayList<FileStatus>();
+      FileStatus[] statuses = fs.listStatus(path, filter);
+      if (statuses != null) {
+        for (FileStatus status : statuses) {
+          if (status.isDir()) {
+            if (isRecursive) {
+              list.addAll(listFiles(status.getPath(), fs, filter, isRecursive));
             }
+          } else {
+            list.add(status);
           }
+        }
+      }
+      return list;
+    }
 
-          String[] sortableNames = dirNames.toArray(new String[1]);
+    /**
+     * Processes the input file/folder argument. If the input is a file,
+     * then it is directly considered for further processing by TraceBuilder.
+     * If the input is a folder, then all the history logs in the
+     * input folder are considered for further processing.
+     *
+     * If isRecursive is true, then the input path is recursively scanned
+     * for job history logs for further processing by TraceBuilder.
+     *
+     * NOTE: If the input represents a globbed path, then it is first flattened
+     *       and then the individual paths represented by the globbed input
+     *       path are considered for further processing.
+     *
+     * @param input        input path, possibly globbed
+     * @param conf         configuration
+     * @param isRecursive  whether to recursively traverse the input paths to
+     *                     find history logs
+     * @return the input history log files' paths
+     * @throws FileNotFoundException
+     * @throws IOException
+     */
+    static List<Path> processInputArgument(String input, Configuration conf,
+        boolean isRecursive) throws FileNotFoundException, IOException {
+      Path inPath = new Path(input);
+      FileSystem fs = inPath.getFileSystem(conf);
+      FileStatus[] inStatuses = fs.globStatus(inPath);
+
+      List<Path> inputPaths = new LinkedList<Path>();
+      if (inStatuses == null || inStatuses.length == 0) {
+        return inputPaths;
+      }
 
-          Arrays.sort(sortableNames);
+      for (FileStatus inStatus : inStatuses) {
+        Path thisPath = inStatus.getPath();
+        if (inStatus.isDir()) {
+
+          // Find list of files in this path(recursively if -recursive option
+              // is specified).
+          List<FileStatus> historyLogs = new ArrayList<FileStatus>();
+
+          List<FileStatus> statuses = listFiles(thisPath, fs, new InputFilter(),
+              isRecursive);
+          for (FileStatus child : statuses) {
+            historyLogs.add(child);
+          }
+          if (historyLogs.size() > 0) {
+            // Add the sorted history log file names in this path to the
+            // inputPaths list
+            FileStatus[] sortableNames =
+              historyLogs.toArray(new FileStatus[historyLogs.size()]);
+            Arrays.sort(sortableNames, new HistoryLogsComparator());
 
-          for (String dirName : sortableNames) {
-            inputs.add(new Path(thisPath, dirName));
+            for (FileStatus historyLog : sortableNames) {
+              inputPaths.add(historyLog.getPath());
+            }
           }
         } else {
-          inputs.add(thisPath);
+          inputPaths.add(thisPath);
         }
       }
+
+      return inputPaths;
     }
   }
 
@@ -169,25 +258,11 @@ public class TraceBuilder extends Config
     return jobId != null;
   }
 
-  private void addInterestedProperties(List<String> interestedProperties,
-      String[] names) {
-    for (String name : names) {
-      interestedProperties.add(name);
-    }
-  }
 
   @SuppressWarnings("unchecked")
   @Override
   public int run(String[] args) throws Exception {
     MyOptions options = new MyOptions(args, getConf());
-    List<String> interestedProperties = new ArrayList<String>();
-    {
-      for (JobConfPropertyNames candidateSet : JobConfPropertyNames.values()) {
-        addInterestedProperties(interestedProperties, candidateSet
-            .getCandidates());
-      }
-    }
-    jobConfParser = new JobConfigurationParser(interestedProperties);
     traceWriter = options.clazzTraceOutputter.newInstance();
     traceWriter.init(options.traceOutput, getConf());
     topologyWriter = new DefaultOutputter<LoggedNetworkTopology>();
@@ -232,7 +307,7 @@ public class TraceBuilder extends Config
               }
 
               if (isJobConfXml(filePair.first(), ris)) {
-                processJobConf(jobConfParser.parse(ris.rewind()), jobBuilder);
+            	processJobConf(JobConfigurationParser.parse(ris.rewind()), jobBuilder);
               } else {
                 parser = JobHistoryParserFactory.getParser(ris);
                 if (parser == null) {

Modified: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java?rev=1185694&r1=1185693&r2=1185694&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Tue Oct 18 14:45:48 2011
@@ -120,8 +120,20 @@ public class ZombieJob implements JobSto
   @Override
   public synchronized JobConf getJobConf() {
     if (jobConf == null) {
-      // TODO : add more to jobConf ?
       jobConf = new JobConf();
+      
+      // Add parameters from the configuration in the job trace
+      //
+      // The reason why the job configuration parameters, as seen in the jobconf
+      // file, are added first because the specialized values obtained from 
+      // Rumen should override the job conf values.
+      //
+      for (Map.Entry<Object, Object> entry : job.getJobProperties().entrySet()) {
+        jobConf.set(entry.getKey().toString(), entry.getValue().toString());
+      }
+      
+      //TODO Eliminate parameters that are already copied from the job's 
+      // configuration file.
       jobConf.setJobName(getName());
       jobConf.setUser(getUser());
       jobConf.setNumMapTasks(getNumberMaps());
@@ -622,6 +634,7 @@ public class ZombieJob implements JobSto
     long outputBytes = -1;
     long outputRecords = -1;
     long heapMegabytes = -1;
+    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
 
     Values type = loggedTask.getTaskType();
     if ((type != Values.MAP) && (type != Values.REDUCE)) {
@@ -656,12 +669,15 @@ public class ZombieJob implements JobSto
             (job.getJobReduceMB() > 0) ? job.getJobReduceMB() : job
                 .getHeapMegabytes();
       }
+      // set the resource usage metrics
+      metrics = attempt.getResourceUsageMetrics();
       break;
     }
 
     TaskInfo taskInfo =
         new TaskInfo(inputBytes, (int) inputRecords, outputBytes,
-            (int) outputRecords, (int) heapMegabytes);
+            (int) outputRecords, (int) heapMegabytes,
+            metrics);
     return taskInfo;
   }
 

Added: hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/package-info.java?rev=1185694&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/package-info.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/tools/org/apache/hadoop/tools/rumen/package-info.java Tue Oct 18 14:45:48 2011
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Rumen is a data extraction and analysis tool built for 
+ * <a href="http://hadoop.apache.org/">Apache Hadoop</a>. Rumen mines job history
+ * logs to extract meaningful data and stores it into an easily-parsed format.
+ * 
+ * The default output format of Rumen is <a href="http://www.json.org">JSON</a>.
+ * Rumen uses the <a href="http://jackson.codehaus.org/">Jackson</a> library to 
+ * create JSON objects.
+ * <br><br>
+ * 
+ * The following classes can be used to programmatically invoke Rumen:
+ * <ol>
+ *   <li>
+ *    {@link org.apache.hadoop.tools.rumen.JobConfigurationParser}<br>
+ *      A parser to parse and filter out interesting properties from job 
+ *      configuration.
+ *      
+ *      <br><br>
+ *      <i>Sample code</i>:
+ *      <pre>
+ *      <code>
+ *        // An example to parse and filter out job name
+ *        
+ *        String conf_filename = .. // assume the job configuration filename here
+ *        
+ *        // construct a list of interesting properties
+ *        List<String> interestedProperties = new ArrayList<String>();
+ *        interestedProperties.add("mapreduce.job.name");
+ *        
+ *        JobConfigurationParser jcp = 
+ *          new JobConfigurationParser(interestedProperties);
+ *
+ *        InputStream in = new FileInputStream(conf_filename);
+ *        Properties parsedProperties = jcp.parse(in);
+ *     </code>
+ *     </pre>
+ *     Some of the commonly used interesting properties are enumerated in 
+ *     {@link org.apache.hadoop.tools.rumen.JobConfPropertyNames}. <br><br>
+ *     
+ *     <b>Note:</b>
+ *        A single instance of {@link org.apache.hadoop.tools.rumen.JobConfigurationParser} 
+ *        can be used to parse multiple job configuration files. 
+ *     
+ *   </li>
+ *   <li>
+ *    {@link org.apache.hadoop.tools.rumen.JobHistoryParser} <br>
+ *      A parser that parses job history files. It is an interface and actual 
+ *      implementations are defined as Enum in 
+ *      {@link org.apache.hadoop.tools.rumen.JobHistoryParserFactory}. Note that
+ *      {@link org.apache.hadoop.tools.rumen.RewindableInputStream}<br>
+ *      is a wrapper class around {@link java.io.InputStream} to make the input 
+ *      stream rewindable.
+ *      
+ *      <br>
+ *      <i>Sample code</i>:
+ *      <pre>
+ *      <code>
+ *        // An example to parse a current job history file i.e a job history 
+ *        // file for which the version is known
+ *        
+ *        String filename = .. // assume the job history filename here
+ *        
+ *        InputStream in = new FileInputStream(filename);
+ *        
+ *        HistoryEvent event = null;
+ *        
+ *        JobHistoryParser parser = new CurrentJHParser(in);
+ *        
+ *        event = parser.nextEvent();
+ *        // process all the events
+ *        while (event != null) {
+ *          // ... process all event
+ *          event = parser.nextEvent();
+ *        }
+ *        
+ *        // close the parser and the underlying stream
+ *        parser.close();
+ *      </code>
+ *      </pre>
+ *      
+ *      {@link org.apache.hadoop.tools.rumen.JobHistoryParserFactory} provides a 
+ *      {@link org.apache.hadoop.tools.rumen.JobHistoryParserFactory#getParser(org.apache.hadoop.tools.rumen.RewindableInputStream)}
+ *      API to get a parser for parsing the job history file. Note that this
+ *      API can be used if the job history version is unknown.<br><br>
+ *      <i>Sample code</i>:
+ *      <pre>
+ *      <code>
+ *        // An example to parse a job history for which the version is not 
+ *        // known i.e using JobHistoryParserFactory.getParser()
+ *        
+ *        String filename = .. // assume the job history filename here
+ *        
+ *        InputStream in = new FileInputStream(filename);
+ *        RewindableInputStream ris = new RewindableInputStream(in);
+ *        
+ *        // JobHistoryParserFactory will check and return a parser that can
+ *        // parse the file
+ *        JobHistoryParser parser = JobHistoryParserFactory.getParser(ris);
+ *        
+ *        // now use the parser to parse the events
+ *        HistoryEvent event = parser.nextEvent();
+ *        while (event != null) {
+ *          // ... process the event
+ *          event = parser.nextEvent();
+ *        }
+ *        
+ *        parser.close();
+ *      </code>
+ *      </pre>
+ *      <b>Note:</b>
+ *        Create one instance to parse a job history log and close it after use.
+ *  </li>
+ *  <li>
+ *    {@link org.apache.hadoop.tools.rumen.TopologyBuilder}<br>
+ *      Builds the cluster topology based on the job history events. Every 
+ *      job history file consists of events. Each event can be represented using
+ *      {@link org.apache.hadoop.mapreduce.jobhistory.HistoryEvent}. 
+ *      These events can be passed to {@link org.apache.hadoop.tools.rumen.TopologyBuilder} using 
+ *      {@link org.apache.hadoop.tools.rumen.TopologyBuilder#process(org.apache.hadoop.mapreduce.jobhistory.HistoryEvent)}.
+ *      A cluster topology can be represented using {@link org.apache.hadoop.tools.rumen.LoggedNetworkTopology}.
+ *      Once all the job history events are processed, the cluster 
+ *      topology can be obtained using {@link org.apache.hadoop.tools.rumen.TopologyBuilder#build()}.
+ *      
+ *      <br><br>
+ *      <i>Sample code</i>:
+ *      <pre>
+ *      <code>
+ *        // Building topology for a job history file represented using 
+ *        // 'filename' and the corresponding configuration file represented 
+ *        // using 'conf_filename'
+ *        String filename = .. // assume the job history filename here
+ *        String conf_filename = .. // assume the job configuration filename here
+ *        
+ *        InputStream jobConfInputStream = new FileInputStream(filename);
+ *        InputStream jobHistoryInputStream = new FileInputStream(conf_filename);
+ *        
+ *        TopologyBuilder tb = new TopologyBuilder();
+ *        
+ *        // construct a list of interesting properties
+ *        List<String> interestingProperties = new ArrayList<Strng>();
+ *        // add the interesting properties here
+ *        interestingProperties.add("mapreduce.job.name");
+ *        
+ *        JobConfigurationParser jcp = 
+ *          new JobConfigurationParser(interestingProperties);
+ *        
+ *        // parse the configuration file
+ *        tb.process(jcp.parse(jobConfInputStream));
+ *        
+ *        // read the job history file and pass it to the 
+ *        // TopologyBuilder.
+ *        JobHistoryParser parser = new CurrentJHParser(jobHistoryInputStream);
+ *        HistoryEvent e;
+ *        
+ *        // read and process all the job history events
+ *        while ((e = parser.nextEvent()) != null) {
+ *          tb.process(e);
+ *        }
+ *        
+ *        LoggedNetworkTopology topology = tb.build();
+ *      </code>
+ *      </pre>
+ *  </li>
+ *  <li>
+ *    {@link org.apache.hadoop.tools.rumen.JobBuilder}<br>
+ *      Summarizes a job history file.
+ *      {@link org.apache.hadoop.tools.rumen.TraceBuilder} provides  
+ *      {@link org.apache.hadoop.tools.rumen.TraceBuilder#extractJobID(String)} 
+ *      API for extracting job id from job history or job configuration files
+ *      which can be used for instantiating {@link org.apache.hadoop.tools.rumen.JobBuilder}. 
+ *      {@link org.apache.hadoop.tools.rumen.JobBuilder} generates a 
+ *      {@link org.apache.hadoop.tools.rumen.LoggedJob} object via 
+ *      {@link org.apache.hadoop.tools.rumen.JobBuilder#build()}. 
+ *      See {@link org.apache.hadoop.tools.rumen.LoggedJob} for more details.
+ *      
+ *      <br><br>
+ *      <i>Sample code</i>:
+ *      <pre>
+ *      <code>
+ *        // An example to summarize a current job history file 'filename'
+ *        // and the corresponding configuration file 'conf_filename'
+ *        
+ *        String filename = .. // assume the job history filename here
+ *        String conf_filename = .. // assume the job configuration filename here
+ *        
+ *        InputStream jobConfInputStream = new FileInputStream(job_filename);
+ *        InputStream jobHistoryInputStream = new FileInputStream(conf_filename);
+ *        
+ *        String jobID = TraceBuilder.extractJobID(job_filename);
+ *        JobBuilder jb = new JobBuilder(jobID);
+ *        
+ *        // construct a list of interesting properties
+ *        List<String> interestingProperties = new ArrayList<Strng>();
+ *        // add the interesting properties here
+ *        interestingProperties.add("mapreduce.job.name");
+ *        
+ *        JobConfigurationParser jcp = 
+ *          new JobConfigurationParser(interestingProperties);
+ *        
+ *        // parse the configuration file
+ *        jb.process(jcp.parse(jobConfInputStream));
+ *        
+ *        // parse the job history file
+ *        JobHistoryParser parser = new CurrentJHParser(jobHistoryInputStream);
+ *        try {
+ *          HistoryEvent e;
+ *          // read and process all the job history events
+ *          while ((e = parser.nextEvent()) != null) {
+ *            jobBuilder.process(e);
+ *          }
+ *        } finally {
+ *          parser.close();
+ *        }
+ *        
+ *        LoggedJob job = jb.build();
+ *      </code>
+ *      </pre>
+ *     <b>Note:</b>
+ *       The order of parsing the job configuration file or job history file is 
+ *       not important. Create one instance to parse the history file and job 
+ *       configuration.
+ *   </li>
+ *   <li>
+ *    {@link org.apache.hadoop.tools.rumen.DefaultOutputter}<br>
+ *      Implements {@link org.apache.hadoop.tools.rumen.Outputter} and writes 
+ *      JSON object in text format to the output file. 
+ *      {@link org.apache.hadoop.tools.rumen.DefaultOutputter} can be 
+ *      initialized with the output filename.
+ *      
+ *      <br><br>
+ *      <i>Sample code</i>:  
+ *      <pre>
+ *      <code>
+ *        // An example to summarize a current job history file represented by
+ *        // 'filename' and the configuration filename represented using 
+ *        // 'conf_filename'. Also output the job summary to 'out.json' along 
+ *        // with the cluster topology to 'topology.json'.
+ *        
+ *        String filename = .. // assume the job history filename here
+ *        String conf_filename = .. // assume the job configuration filename here
+ *        
+ *        Configuration conf = new Configuration();
+ *        DefaultOutputter do = new DefaultOutputter();
+ *        do.init("out.json", conf);
+ *        
+ *        InputStream jobConfInputStream = new FileInputStream(filename);
+ *        InputStream jobHistoryInputStream = new FileInputStream(conf_filename);
+ *        
+ *        // extract the job-id from the filename
+ *        String jobID = TraceBuilder.extractJobID(filename);
+ *        JobBuilder jb = new JobBuilder(jobID);
+ *        TopologyBuilder tb = new TopologyBuilder();
+ *        
+ *        // construct a list of interesting properties
+ *        List<String> interestingProperties = new ArrayList<Strng>();
+ *        // add the interesting properties here
+ *        interestingProperties.add("mapreduce.job.name");
+ *        
+ *        JobConfigurationParser jcp =
+ *          new JobConfigurationParser(interestingProperties);
+ *          
+ *        // parse the configuration file
+ *        tb.process(jcp.parse(jobConfInputStream));
+ *        
+ *        // read the job history file and pass it to the
+ *        // TopologyBuilder.
+ *        JobHistoryParser parser = new CurrentJHParser(jobHistoryInputStream);
+ *        HistoryEvent e;
+ *        while ((e = parser.nextEvent()) != null) {
+ *          jb.process(e);
+ *          tb.process(e);
+ *        }
+ *        
+ *        LoggedJob j = jb.build();
+ *        
+ *        // serialize the job summary in json (text) format
+ *        do.output(j);
+ *        
+ *        // close
+ *        do.close();
+ *        
+ *        do.init("topology.json", conf);
+ *        
+ *        // get the job summary using TopologyBuilder
+ *        LoggedNetworkTopology topology = topologyBuilder.build();
+ *        
+ *        // serialize the cluster topology in json (text) format
+ *        do.output(topology);
+ *        
+ *        // close
+ *        do.close();
+ *      </code>
+ *      </pre>
+ *   </li>
+ *   <li>
+ *    {@link org.apache.hadoop.tools.rumen.JobTraceReader}<br>
+ *      A reader for reading {@link org.apache.hadoop.tools.rumen.LoggedJob} serialized using 
+ *      {@link org.apache.hadoop.tools.rumen.DefaultOutputter}. {@link org.apache.hadoop.tools.rumen.LoggedJob} 
+ *      provides various APIs for extracting job details. Following are the most
+ *      commonly used ones
+ *        <ul>
+ *          <li>{@link org.apache.hadoop.tools.rumen.LoggedJob#getMapTasks()} : Get the map tasks</li>
+ *          <li>{@link org.apache.hadoop.tools.rumen.LoggedJob#getReduceTasks()} : Get the reduce tasks</li>
+ *          <li>{@link org.apache.hadoop.tools.rumen.LoggedJob#getOtherTasks()} : Get the setup/cleanup tasks</li>
+ *          <li>{@link org.apache.hadoop.tools.rumen.LoggedJob#getOutcome()} : Get the job's outcome</li>
+ *          <li>{@link org.apache.hadoop.tools.rumen.LoggedJob#getSubmitTime()} : Get the job's submit time</li>
+ *          <li>{@link org.apache.hadoop.tools.rumen.LoggedJob#getFinishTime()} : Get the job's finish time</li>
+ *        </ul>
+ *        
+ *      <br><br>
+ *      <i>Sample code</i>:
+ *      <pre>
+ *      <code>
+ *        // An example to read job summary from a trace file 'out.json'.
+ *        JobTraceReader reader = new JobTracerReader("out.json");
+ *        LoggedJob job = reader.getNext();
+ *        while (job != null) {
+ *          // .... process job level information
+ *          for (LoggedTask task : job.getMapTasks()) {
+ *            // process all the map tasks in the job
+ *            for (LoggedTaskAttempt attempt : task.getAttempts()) {
+ *              // process all the map task attempts in the job
+ *            }
+ *          }
+ *          
+ *          // get the next job
+ *          job = reader.getNext();
+ *        }
+ *        reader.close();
+ *      </code>
+ *      </pre>         
+ *   </li>
+ *   <li>
+ *    {@link org.apache.hadoop.tools.rumen.ClusterTopologyReader}<br>
+ *      A reader to read {@link org.apache.hadoop.tools.rumen.LoggedNetworkTopology} serialized using 
+ *      {@link org.apache.hadoop.tools.rumen.DefaultOutputter}. {@link org.apache.hadoop.tools.rumen.ClusterTopologyReader} can be 
+ *      initialized using the serialized topology filename. 
+ *      {@link org.apache.hadoop.tools.rumen.ClusterTopologyReader#get()} can
+ *      be used to get the 
+ *      {@link org.apache.hadoop.tools.rumen.LoggedNetworkTopology}. 
+ *      
+ *      <br><br>
+ *      <i>Sample code</i>:
+ *      <pre>
+ *      <code>
+ *        // An example to read the cluster topology from a topology output file
+ *        // 'topology.json'
+ *        ClusterTopologyReader reader = new ClusterTopologyReader("topology.json");
+ *        LoggedNetworkTopology topology  = reader.get();
+ *        for (LoggedNetworkTopology t : topology.getChildren()) {
+ *          // process the cluster topology
+ *        }
+ *        reader.close();
+ *      </code>
+ *      </pre>
+ *   </li>
+ * </ol>     
+ */
+
+package org.apache.hadoop.tools.rumen;
\ No newline at end of file



Mime
View raw message