hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r814122 [1/2] - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/zombie/ src/tools/org/apache/hadoop/tools/rumen/
Date Sat, 12 Sep 2009 09:31:36 GMT
Author: cdouglas
Date: Sat Sep 12 09:31:34 2009
New Revision: 814122

URL: http://svn.apache.org/viewvc?rev=814122&view=rev
Log:
MAPREDUCE-966. Modify Rumen to clean up interfaces and simplify integration
with other tools. Contributed by Hong Tang

Added:
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RackNode.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java
Removed:
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java
    hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Sep 12 09:31:34 2009
@@ -331,6 +331,9 @@
     MAPREDUCE-973. Move FailJob and SleepJob from examples to test. (cdouglas 
     via omalley)
 
+    MAPREDUCE-966. Modify Rumen to clean up interfaces and simplify integration
+    with other tools. (Hong Tang via cdouglas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java Sat Sep 12 09:31:34 2009
@@ -17,32 +17,25 @@
  */
 
 package org.apache.hadoop.tools.rumen;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.PrintStream;
-
-import junit.framework.TestCase;
 
 import java.util.List;
 
-import org.codehaus.jackson.JsonParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.codehaus.jackson.JsonEncoding;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.DeserializationConfig;
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
-/**
- *
- */
-public class TestHistograms extends TestCase {
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestHistograms {
 
   /**
    * @throws IOException
@@ -58,107 +51,56 @@
    *           we read the corresponding goldXxx.json as a LoggedDiscreteCDF and
    *           deepCompare them.
    */
+  @Test
   public void testHistograms() throws IOException {
-    String rootInputDir = System.getProperty("test.tools.input.dir", "");
-
-    File rootInputDirFile = new File(rootInputDir);
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+    final Path rootInputDir = new Path(
+        System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
+    final Path rootInputFile = new Path(rootInputDir, "rumen/histogram-tests");
 
-    File rootInputFile = new File(rootInputDirFile, "rumen/histogram-tests");
 
-    if (rootInputDir.charAt(rootInputDir.length() - 1) == '/') {
-      rootInputDir = rootInputDir.substring(0, rootInputDir.length() - 1);
-    }
-
-    String[] tests = rootInputFile.list();
+    FileStatus[] tests = lfs.listStatus(rootInputFile);
 
     for (int i = 0; i < tests.length; ++i) {
-      if (tests[i].length() > 5 && "input".equals(tests[i].substring(0, 5))) {
-        File inputData = new File(rootInputFile, tests[i]);
-
-        if (!(new File(rootInputFile, "build" + tests[i].substring(5)))
-            .exists()
-            && !(new File(rootInputFile, "gold" + tests[i].substring(5))
-                .exists())
-            && !(new File(rootInputFile, "silver" + tests[i].substring(5))
-                .exists())) {
-          System.out
-              .println("Neither a build nor a gold file exists for the file, "
-                  + inputData.getCanonicalPath());
-
-          continue;
+      Path filePath = tests[i].getPath();
+      String fileName = filePath.getName();
+      if (fileName.startsWith("input")) {
+        String testName = fileName.substring("input".length());
+        Path goldFilePath = new Path(rootInputFile, "gold"+testName);
+        assertTrue("Gold file dies not exist", lfs.exists(goldFilePath));
+        LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs);
+        System.out.println("Testing a Histogram for " + fileName);
+        FSDataInputStream goldStream = lfs.open(goldFilePath);
+        JsonObjectMapperParser<LoggedDiscreteCDF> parser = new JsonObjectMapperParser<LoggedDiscreteCDF>(
+            goldStream, LoggedDiscreteCDF.class); 
+        try {
+          LoggedDiscreteCDF dcdf = parser.getNext();
+          dcdf.deepCompare(newResult, new TreePath(null, "<root>"));
+        } catch (DeepInequalityException e) {
+          fail(e.path.toString());
         }
-
-        LoggedDiscreteCDF newResult = histogramFileToCDF(inputData.getPath());
-
-        if ((new File(rootInputFile, "build" + tests[i].substring(5))).exists()
-            && !(new File(rootInputFile, "gold" + tests[i].substring(5)))
-                .exists()
-            && !(new File(rootInputFile, "silver" + tests[i].substring(5)))
-                .exists()) {
-          try {
-            System.out.println("Building a new gold file for the file, "
-                + inputData.getCanonicalPath());
-            System.out.println("Please inspect it thoroughly and rename it.");
-
-            ObjectMapper mapper = new ObjectMapper();
-            JsonFactory factory = mapper.getJsonFactory();
-            PrintStream ostream = new PrintStream(new File(rootInputFile,
-                "silver" + tests[i].substring(5)));
-            JsonGenerator gen = factory.createJsonGenerator(ostream,
-                JsonEncoding.UTF8);
-            gen.useDefaultPrettyPrinter();
-
-            gen.writeObject(newResult);
-
-            gen.close();
-          } catch (IOException e) {
-            e.printStackTrace();
-          }
-        } else {
-          System.out.println("Testing a Histogram built from the file, "
-              + inputData.getCanonicalPath());
-          File goldCDF = new File(rootInputFile, "gold" + tests[i].substring(5));
-          FileInputStream goldStream = new FileInputStream(goldCDF);
-          BufferedReader goldReader = new BufferedReader(new InputStreamReader(
-              goldStream));
-          ObjectMapper goldMapper = new ObjectMapper();
-          JsonParser goldParser = goldMapper.getJsonFactory().createJsonParser(
-              goldReader);
-          LoggedDiscreteCDF DCDF = goldMapper.readValue(goldParser,
-              LoggedDiscreteCDF.class);
-
-          try {
-            DCDF.deepCompare(newResult, new TreePath(null, "<root>"));
-          } catch (DeepInequalityException e) {
-            String error = e.path.toString();
-
-            assertFalse(error, true);
-          }
+        finally {
+            parser.close();
         }
       }
     }
   }
 
-  private static LoggedDiscreteCDF histogramFileToCDF(String filename)
+  private static LoggedDiscreteCDF histogramFileToCDF(Path path, FileSystem fs)
       throws IOException {
-
-    File inputData = new File(filename);
-
-    FileInputStream dataStream = new FileInputStream(inputData);
-    BufferedReader dataReader = new BufferedReader(new InputStreamReader(
-        dataStream));
-    ObjectMapper dataMapper = new ObjectMapper();
-    dataMapper.configure(
-        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    JsonParser dataParser = dataMapper.getJsonFactory().createJsonParser(
-        dataReader);
-    HistogramRawTestData data = dataMapper.readValue(dataParser,
-        HistogramRawTestData.class);
-
+    FSDataInputStream dataStream = fs.open(path);
+    JsonObjectMapperParser<HistogramRawTestData> parser = new JsonObjectMapperParser<HistogramRawTestData>(
+        dataStream, HistogramRawTestData.class);
+    HistogramRawTestData data;
+    try {
+      data = parser.getNext();
+    } finally {
+      parser.close();
+    }
+    
     Histogram hist = new Histogram();
-
     List<Long> measurements = data.getData();
-
     List<Long> typeProbeData = new HistogramRawTestData().getData();
 
     assertTrue(
@@ -180,7 +122,34 @@
     }
 
     result.setCDF(hist, percentiles, data.getScale());
-
     return result;
   }
+  
+  public static void main(String[] args) throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    for (String arg : args) {
+      Path filePath = new Path(arg).makeQualified(lfs);
+      String fileName = filePath.getName();
+      if (fileName.startsWith("input")) {
+        LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs);
+        String testName = fileName.substring("input".length());
+        Path goldFilePath = new Path(filePath.getParent(), "gold"+testName);
+
+        ObjectMapper mapper = new ObjectMapper();
+        JsonFactory factory = mapper.getJsonFactory();
+        FSDataOutputStream ostream = lfs.create(goldFilePath, true);
+        JsonGenerator gen = factory.createJsonGenerator(ostream,
+            JsonEncoding.UTF8);
+        gen.useDefaultPrettyPrinter();
+        
+        gen.writeObject(newResult);
+        
+        gen.close();
+      } else {
+        System.err.println("Input file not started with \"input\". File "+fileName+" skipped.");
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java Sat Sep 12 09:31:34 2009
@@ -20,9 +20,10 @@
 
 import java.util.ArrayList;
 
-import junit.framework.TestCase;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
-public class TestPiecewiseLinearInterpolation extends TestCase {
+public class TestPiecewiseLinearInterpolation {
 
   static private double maximumRelativeError = 0.002D;
 
@@ -35,6 +36,7 @@
     return result;
   }
 
+  @Test
   public void testOneRun() {
     LoggedDiscreteCDF input = new LoggedDiscreteCDF();
 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Sat Sep 12 09:31:34 2009
@@ -18,33 +18,20 @@
 
 package org.apache.hadoop.tools.rumen;
 
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.ToolRunner;
 
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.DeserializationConfig;
-
-import junit.framework.TestCase;
-
-public class TestRumenJobTraces extends TestCase {
+import org.junit.Test;
+import static org.junit.Assert.*;
 
+public class TestRumenJobTraces {
+  @Test
   public void testSmallTrace() throws Exception {
     final Configuration conf = new Configuration();
     final FileSystem lfs = FileSystem.getLocal(conf);
@@ -59,11 +46,6 @@
     final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces");
     lfs.delete(tempDir, true);
 
-    assertFalse("property test.build.data is not defined",
-        "".equals(rootTempDir));
-    assertFalse("property test.tools.input.dir is not defined",
-        "".equals(rootInputDir));
-
     final Path topologyFile = new Path(tempDir, "topology.json");
     final Path traceFile = new Path(tempDir, "trace.json");
 
@@ -84,248 +66,50 @@
 
     args[5] = inputFile.toString();
 
-    PrintStream old_stdout = System.out;
-
-    final Path stdoutFile = new Path(tempDir, "stdout.text");
-
-    System.out.println("stdout file = " + stdoutFile);
-
-    PrintStream enveloped_stdout = new PrintStream(new BufferedOutputStream(
-          lfs.create(stdoutFile, true)));
-
     final Path topologyGoldFile = new Path(rootInputFile, 
         "job-tracker-logs-topology-output");
     final Path traceGoldFile = new Path(rootInputFile,
         "job-tracker-logs-trace-output");
 
-    try {
-      System.setOut(enveloped_stdout);
-
-      HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
-
-      int result = ToolRunner.run(analyzer, args);
-
-      enveloped_stdout.close();
-
-      assertEquals("Non-zero exit", 0, result);
-
-    } finally {
-      System.setOut(old_stdout);
-    }
-
-    jsonFileMatchesGold(lfs, topologyFile, topologyGoldFile,
-        new LoggedNetworkTopology(), "topology");
-    jsonFileMatchesGold(lfs, traceFile, traceGoldFile, new LoggedJob(),
-        "trace");
-
+    HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
+    int result = ToolRunner.run(analyzer, args);
+    assertEquals("Non-zero exit", 0, result);
+
+    TestRumenJobTraces
+        .<LoggedNetworkTopology> jsonFileMatchesGold(lfs, topologyFile,
+            topologyGoldFile, LoggedNetworkTopology.class, "topology");
+    TestRumenJobTraces.<LoggedJob> jsonFileMatchesGold(lfs, traceFile,
+        traceGoldFile, LoggedJob.class, "trace");
   }
 
-  /*
-   * This block of methods is commented out because its methods require huge
-   * test files to support them meaningfully. We expect to be able to fix this
-   * problem in a furture release.
-   * 
-   * public void testBulkFilesJobDistro() throws IOException { String args[] = {
-   * "-v1", "-delays", "-runtimes" }; statisticalTest(args,
-   * "rumen/large-test-inputs/monolithic-files",
-   * "rumen/large-test-inputs/gold-bulk-job-distribution.text", true); }
-   * 
-   * public void testIndividualFilesJobDistro() throws IOException { String
-   * args[] = { "-v1", "-delays", "-runtimes" }; statisticalTest(args,
-   * "rumen/large-test-inputs/individual-files",
-   * "rumen/large-test-inputs/gold-individual-job-distribution.text", true); }
-   * 
-   * public void testSpreadsGZFile() throws IOException { String args[] = {
-   * "-v1", "-delays", "-runtimes", "-spreads", "10", "90",
-   * "-job-digest-spectra", "10", "50", "90" }; statisticalTest( args,
-   * "rumen/large-test-inputs/monolithic-files/jobs-0-99-including-truncations.gz"
-   * , "rumen/large-test-inputs/gold-single-gz-task-distribution.text", false);
-   * }
-   * 
-   * public void testSpreadsSingleFile() throws IOException { String args[] = {
-   * "-v1", "-delays", "-runtimes", "-spreads", "10", "90",
-   * "-job-digest-spectra", "10", "50", "90" }; statisticalTest(args,
-   * "rumen/large-test-inputs/monolithic-files/jobs-100-199",
-   * "rumen/large-test-inputs/gold-single-bulk-task-distribution.text", false);
-   * }
-   */
-
-  /**
-   * 
-   * A test case of HadoopLogsAnalyzer.main consists of a call to this function.
-   * It succeeds by returning,fails by performing a junit assertion failure, and
-   * can abend with an I/O error if some of the inputs aren't there or some of
-   * the output cannot be written [due to quota, perhaps, or permissions
-   * 
-   * 
-   * @param args
-   *          these are the arguments that we eventually supply to
-   *          HadoopLogsAnalyzer.main to test its functionality with regard to
-   *          statistical output
-   * @param inputFname
-   *          this is the file name or directory name of the test input
-   *          directory relative to the test cases data directory.
-   * @param goldFilename
-   *          this is the file name of the expected output relative to the test
-   *          cases data directory.
-   * @param inputIsDirectory
-   *          this states whether the input is an entire directory, or a single
-   *          file.
-   * @throws IOException
-   */
-  private void statisticalTest(String args[], String inputFname,
-      String goldFilename, boolean inputIsDirectory) throws Exception {
-    File tempDirectory = new File(System.getProperty("test.build.data", "/tmp"));
-
-    String rootInputDir = System.getProperty("test.tools.input.dir", "");
-    String rootTempDir = System.getProperty("test.build.data", "");
-
-    File rootInputDirFile = new File(new File(rootInputDir), inputFname);
-    File tempDirFile = new File(rootTempDir);
-
-    assertFalse("property test.build.data is not defined", ""
-        .equals(rootTempDir));
-    assertFalse("property test.tools.input.dir is not defined", ""
-        .equals(rootInputDir));
-
-    if (rootInputDir.charAt(rootInputDir.length() - 1) == '/') {
-      rootInputDir = rootInputDir.substring(0, rootInputDir.length() - 1);
-    }
-
-    if (rootTempDir.charAt(rootTempDir.length() - 1) == '/') {
-      rootTempDir = rootTempDir.substring(0, rootTempDir.length() - 1);
-    }
-
-    File jobDistroGold = new File(new File(rootInputDir), goldFilename);
-
-    String[] newArgs = new String[args.length + 1];
-
-    System.arraycopy(args, 0, newArgs, 0, args.length);
-
-    newArgs[args.length + 1 - 1] = rootInputDirFile.getPath();
-
-    String complaint = inputIsDirectory ? " is not a directory."
-        : " does not exist.";
-
-    boolean okay = inputIsDirectory ? rootInputDirFile.isDirectory()
-        : rootInputDirFile.canRead();
-
-    assertTrue("The input file " + rootInputDirFile.getPath() + complaint, okay);
-
-    PrintStream old_stdout = System.out;
-
-    File stdoutFile = File.createTempFile("stdout", "text", tempDirFile);
-
-    // stdoutFile.deleteOnExit();
-
-    PrintStream enveloped_stdout = new PrintStream(new BufferedOutputStream(
-        new FileOutputStream(stdoutFile)));
-
-    try {
-      System.setOut(enveloped_stdout);
-
-      HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer();
-
-      int result = ToolRunner.run(analyzer, args);
-
-      enveloped_stdout.close();
-
-      System.setOut(old_stdout);
-
-      assertFilesMatch(stdoutFile, jobDistroGold);
-      assertEquals("Non-zero exit", 0, result);
-    } finally {
-      System.setOut(old_stdout);
-    }
-  }
-
-  static private Object readMapper(ObjectMapper mapper, JsonParser parser,
-      Object obj) throws IOException {
-    try {
-      return mapper.readValue(parser, obj.getClass());
-    } catch (EOFException e) {
-      return null;
-    }
-  }
-
-  static private void assertFilesMatch(File result, File gold)
-      throws IOException {
-    System.out.println("Comparing files: " + result.getPath() + " vrs. "
-        + gold.getPath());
-
-    int currentLineNumber = 1;
-    FileInputStream goldStream = new FileInputStream(gold);
-    BufferedReader goldReader = new BufferedReader(new InputStreamReader(
-        goldStream));
-    String currentGoldLine = goldReader.readLine();
-
-    FileInputStream resultStream = new FileInputStream(result);
-    BufferedReader resultReader = new BufferedReader(new InputStreamReader(
-        resultStream));
-    String currentResultLine = resultReader.readLine();
-
-    while (currentGoldLine != null && currentResultLine != null
-        && currentGoldLine.equals(currentResultLine)) {
-      ++currentLineNumber;
-
-      currentGoldLine = goldReader.readLine();
-      currentResultLine = resultReader.readLine();
-    }
-
-    if (currentGoldLine == null && currentResultLine == null) {
-      return;
-    }
-
-    assertFalse("Line number " + currentLineNumber + " disagrees", true);
-  }
-
-  static private void jsonFileMatchesGold(FileSystem lfs, Path result,
-        Path gold, Object obj, String fileDescription) throws IOException {
+  static private <T extends DeepCompare> void jsonFileMatchesGold(
+      FileSystem lfs, Path result, Path gold, Class<? extends T> clazz,
+      String fileDescription) throws IOException {
     InputStream goldStream = lfs.open(gold);
-    BufferedReader goldReader = new BufferedReader(new InputStreamReader(
-        goldStream));
-
+    JsonObjectMapperParser<T> goldParser = new JsonObjectMapperParser<T>(
+        goldStream, clazz);
     InputStream resultStream = lfs.open(result);
-    BufferedReader resultReader = new BufferedReader(new InputStreamReader(
-        resultStream));
-
-    ObjectMapper goldMapper = new ObjectMapper();
-    ObjectMapper resultMapper = new ObjectMapper();
-    goldMapper.configure(
-        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-    resultMapper.configure(
-        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
-
-    JsonParser goldParser = goldMapper.getJsonFactory().createJsonParser(
-        goldReader);
-    JsonParser resultParser = resultMapper.getJsonFactory().createJsonParser(
-        resultReader);
-
-    DeepCompare goldJob = (DeepCompare) readMapper(goldMapper, goldParser, obj);
-    DeepCompare resultJob = (DeepCompare) readMapper(resultMapper,
-        resultParser, obj);
-
-    while (goldJob != null && resultJob != null) {
-      try {
-        resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
-      } catch (DeepInequalityException e) {
-        String error = e.path.toString();
+    JsonObjectMapperParser<T> resultParser = new JsonObjectMapperParser<T>(
+        resultStream, clazz);
+    try {
+      while (true) {
+        DeepCompare goldJob = goldParser.getNext();
+        DeepCompare resultJob = resultParser.getNext();
+        if ((goldJob == null) || (resultJob == null)) {
+          assertTrue(goldJob == resultJob);
+          break;
+        }
+
+        try {
+          resultJob.deepCompare(goldJob, new TreePath(null, "<root>"));
+        } catch (DeepInequalityException e) {
+          String error = e.path.toString();
 
-        assertFalse(fileDescription + " mismatches: " + error, true);
+          assertFalse(fileDescription + " mismatches: " + error, true);
+        }
       }
-
-      goldJob = (DeepCompare) readMapper(goldMapper, goldParser, obj);
-      resultJob = (DeepCompare) readMapper(resultMapper, resultParser, obj);
-    }
-
-    if (goldJob != null) {
-      assertFalse(
-          "The Gold File has more logged jobs than the result of the run", true);
-    }
-
-    if (resultJob != null) {
-      assertFalse("The result file has more logged jobs than the Gold File",
-          true);
+    } finally {
+      IOUtils.cleanup(null, goldParser, resultParser);
     }
   }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java Sat Sep 12 09:31:34 2009
@@ -18,22 +18,20 @@
 
 package org.apache.hadoop.tools.rumen;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Vector;
 import java.util.List;
 import java.util.ArrayList;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskStatus.State;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import junit.framework.TestCase;
-
-public class TestZombieJob extends TestCase {
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
 
+public class TestZombieJob {
   final double epsilon = 0.01;
   private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 };
   private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 };
@@ -41,26 +39,26 @@
   private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182,
       0.0013027618551328818, 2.605523710265763E-4 };
 
+  private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L,
+      1564L, 1234L };
+  private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L,
+      1514383L, 139391, 1234L };
+
   List<LoggedJob> loggedJobs = new ArrayList<LoggedJob>();
   List<JobStory> jobStories = new ArrayList<JobStory>();
 
-  /*
-   * (non-Javadoc)
-   * 
-   * @see junit.framework.TestCase#setUp()
-   */
-  protected void setUp() throws Exception {
-    String rootTempDir = System.getProperty("test.build.data", "/tmp");
-
-    String rootInputDir = System.getProperty("test.tools.input.dir", "");
-
-    File rootInputFile = new File(new File(rootInputDir), "rumen/zombie");
-    File tempDirFile = new File(rootTempDir);
-
-    Parser parser = new Parser(new FileReader(new File(rootInputFile,
-        "input-trace.json")));
-
-    parser.readTopology(new File(rootInputFile, "input-topology.json"));
+  @Before
+  public void setUp() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootInputDir = new Path(
+        System.getProperty("test.tools.input.dir", "")).makeQualified(lfs);
+    final Path rootInputFile = new Path(rootInputDir, "rumen/zombie");
+
+    ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile,
+        "input-trace.json"), new ZombieCluster(new Path(rootInputFile,
+        "input-topology.json"), null, conf), conf);
 
     JobStory job = null;
     for (int i = 0; i < 4; i++) {
@@ -74,12 +72,6 @@
 
       System.out.println("Input Splits -- " + job.getInputSplits().length
           + ", " + job.getNumberMaps());
-      /*
-       * for (InputSplit split: job.getInputSplits()) {
-       * System.out.print(split.getLength() + ": "); for (String location:
-       * split.getLocations()) { System.out.print(location + ","); }
-       * System.out.println(); }
-       */
 
       System.out.println("Successful Map CDF -------");
       for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) {
@@ -125,21 +117,10 @@
       loggedJobs.add(loggedJob);
       jobStories.add(job);
     }
-
-    super.setUp();
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see junit.framework.TestCase#tearDown()
-   */
-  protected void tearDown() throws Exception {
-    super.tearDown();
   }
 
-  public void testFirstJob() throws FileNotFoundException, IOException,
-      InterruptedException {
+  @Test
+  public void testFirstJob() {
     // 20th job seems reasonable: "totalMaps":329,"totalReduces":101
     // successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown
     // failed map: 0-0-0-1
@@ -177,8 +158,8 @@
     // TODO fill in test case
   }
 
-  public void testSecondJob() throws FileNotFoundException, IOException,
-      InterruptedException {
+  @Test
+  public void testSecondJob() {
     // 7th job has many failed tasks.
     // 3204 m, 0 r
     // successful maps 497-586-23-1, failed maps 0-0-0-2714
@@ -209,8 +190,8 @@
     // available data set.
   }
 
-  public void testFourthJob() throws FileNotFoundException, IOException,
-      InterruptedException {
+  @Test
+  public void testFourthJob() {
     // 7th job has many failed tasks.
     // 3204 m, 0 r
     // successful maps 497-586-23-1, failed maps 0-0-0-2714
@@ -241,8 +222,29 @@
     assertEquals(State.FAILED, taInfo.getRunState());
   }
 
-  public void testMakeUpInfo() throws FileNotFoundException, IOException,
-      InterruptedException {
+  @Test
+  public void testRecordIOInfo() {
+    JobStory job = jobStories.get(3);
+
+    TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113);
+
+    TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0);
+
+    assertEquals(mapTaskCounts[0], mapTask.getInputBytes());
+    assertEquals(mapTaskCounts[1], mapTask.getInputRecords());
+    assertEquals(mapTaskCounts[2], mapTask.getOutputBytes());
+    assertEquals(mapTaskCounts[3], mapTask.getOutputRecords());
+    assertEquals(mapTaskCounts[4], mapTask.getTaskMemory());
+
+    assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes());
+    assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords());
+    assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes());
+    assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords());
+    assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory());
+  }
+
+  @Test
+  public void testMakeUpInfo() {
     // get many non-exist tasks
     // total 3204 map tasks, 3300 is a non-exist task.
     checkMakeUpTask(jobStories.get(3), 113, 1);
@@ -253,7 +255,7 @@
 
     Histogram sampleSucceeded = new Histogram();
     Histogram sampleFailed = new Histogram();
-    Vector<Integer> sampleAttempts = new Vector<Integer>();
+    List<Integer> sampleAttempts = new ArrayList<Integer>();
     for (int i = 0; i < 100000; i++) {
       int attemptId = 0;
       while (true) {

Modified: hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json (original)
+++ hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json Sat Sep 12 09:31:34 2009
@@ -10951,7 +10951,7 @@
   "computonsPerReduceOutputByte" : -1,
   "submitTime" : 1240336853354,
   "launchTime" : 1240336854289,
-  "heapMegabytes" : -1,
+  "heapMegabytes" : 1234,
   "totalMaps" : 131,
   "totalReduces" : 47,
   "outcome" : "SUCCESS",

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * {@link AbstractClusterStory} provides a partial implementation of
+ * {@link ClusterStory} by parsing the topology tree.
+ */
+public abstract class AbstractClusterStory implements ClusterStory {
+  protected Set<MachineNode> machineNodes;
+  protected Set<RackNode> rackNodes;
+  protected MachineNode[] mNodesFlattened;
+  protected Map<String, MachineNode> mNodeMap;
+  protected Map<String, RackNode> rNodeMap;
+  protected int maximumDistance = 0;
+  protected Random random;
+  
+  @Override
+  public Set<MachineNode> getMachines() {
+    parseTopologyTree();
+    return machineNodes;
+  }
+  
+  @Override
+  public synchronized Set<RackNode> getRacks() {
+    parseTopologyTree();    
+    return rackNodes;
+  }
+  
+  @Override
+  public synchronized MachineNode[] getRandomMachines(int expected) {
+    if (expected == 0) {
+      return new MachineNode[0];
+    }
+
+    parseTopologyTree();
+    int total = machineNodes.size();
+    int select = Math.min(expected, total);
+
+    if (mNodesFlattened == null) {
+      mNodesFlattened = machineNodes.toArray(new MachineNode[total]);
+      random = new Random();
+    }
+
+    MachineNode[] retval = new MachineNode[select];
+    int i = 0;
+    while ((i != select) && (total != i + select)) {
+      int index = random.nextInt(total - i);
+      MachineNode tmp = mNodesFlattened[index];
+      mNodesFlattened[index] = mNodesFlattened[total - i - 1];
+      mNodesFlattened[total - i - 1] = tmp;
+      ++i;
+    }
+    if (i == select) {
+      System.arraycopy(mNodesFlattened, total - i, retval, 0, select);
+    } else {
+      System.arraycopy(mNodesFlattened, 0, retval, 0, select);
+    }
+
+    return retval;
+  }
+  
+  protected synchronized void buildMachineNodeMap() {
+    if (mNodeMap == null) {
+      mNodeMap = new HashMap<String, MachineNode>(machineNodes.size());
+      for (MachineNode mn : machineNodes) {
+        mNodeMap.put(mn.getName(), mn);
+      }
+    }
+  }
+  
+  @Override
+  public MachineNode getMachineByName(String name) {
+    buildMachineNodeMap();
+    return mNodeMap.get(name);
+  }
+  
+  @Override
+  public int distance(Node a, Node b) {
+    int lvl_a = a.getLevel();
+    int lvl_b = b.getLevel();
+    int retval = 0;
+    if (lvl_a > lvl_b) {
+      retval = lvl_a-lvl_b;
+      for (int i=0; i<retval; ++i) {
+        a = a.getParent();
+      }
+    } else if (lvl_a < lvl_b) {
+      retval = lvl_b-lvl_a;
+      for (int i=0; i<retval; ++i) {
+        b = b.getParent();
+      }      
+    }
+    
+    while (a != b) {
+      a = a.getParent();
+      b = b.getParent();
+      ++retval;
+    }
+    
+    return retval;
+  }
+  
+  protected synchronized void buildRackNodeMap() {
+    if (rNodeMap == null) {
+      rNodeMap = new HashMap<String, RackNode>(rackNodes.size());
+      for (RackNode rn : rackNodes) {
+        rNodeMap.put(rn.getName(), rn);
+      }
+    }
+  }
+  
+  @Override
+  public RackNode getRackByName(String name) {
+    buildRackNodeMap();
+    return rNodeMap.get(name);
+  }
+  
+  @Override
+  public int getMaximumDistance() {
+    parseTopologyTree();
+    return maximumDistance;
+  }
+  
+  protected synchronized void parseTopologyTree() {
+    if (machineNodes == null) {
+      Node root = getClusterTopology();
+      SortedSet<MachineNode> mNodes = new TreeSet<MachineNode>();
+      SortedSet<RackNode> rNodes = new TreeSet<RackNode>();
+      // dfs search of the tree.
+      Deque<Node> unvisited = new ArrayDeque<Node>();
+      Deque<Integer> distUnvisited = new ArrayDeque<Integer>();
+      unvisited.add(root);
+      distUnvisited.add(0);
+      for (Node n = unvisited.poll(); n != null; n = unvisited.poll()) {
+        int distance = distUnvisited.poll();
+        if (n instanceof RackNode) {
+          rNodes.add((RackNode) n);
+          mNodes.addAll(((RackNode) n).getMachinesInRack());
+          if (distance + 1 > maximumDistance) {
+            maximumDistance = distance + 1;
+          }
+        } else if (n instanceof MachineNode) {
+          mNodes.add((MachineNode) n);
+          if (distance > maximumDistance) {
+            maximumDistance = distance;
+          }
+        } else {
+          for (Node child : n.getChildren()) {
+            unvisited.addFirst(child);
+            distUnvisited.addFirst(distance+1);
+          }
+        }
+      }
+
+      machineNodes = Collections.unmodifiableSortedSet(mNodes);
+      rackNodes = Collections.unmodifiableSortedSet(rNodes);
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java Sat Sep 12 09:31:34 2009
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.tools.rumen;
 
-/**
- * 
- *
- */
 public class CDFPiecewiseLinearRandomGenerator extends CDFRandomGenerator {
 
   /**
@@ -43,7 +39,7 @@
     super(cdf, seed);
   }
 
-  /*
+  /**
    * TODO This code assumes that the empirical minimum resp. maximum is the
    * epistomological minimum resp. maximum. This is probably okay for the
    * minimum, because that likely represents a task where everything went well,

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+
+/**
+ * {@link ClusterStory} represents all configurations of a MapReduce cluster,
+ * including nodes, network topology, and slot configurations.
+ */
+public interface ClusterStory {
+  /**
+   * Get all machines of the cluster.
+   * @return A read-only set that contains all machines of the cluster.
+   */
+  public Set<MachineNode> getMachines();
+
+  /**
+   * Get all racks of the cluster.
+   * @return A read-only set that contains all racks of the cluster.
+   */
+  public Set<RackNode> getRacks();
+  
+  /**
+   * Get the cluster topology tree.
+   * @return The root node of the cluster topology tree.
+   */
+  public Node getClusterTopology();
+  
+  /**
+   * Select a random set of machines.
+   * @param expected The expected sample size.
+   * @return An array of up to expected number of {@link MachineNode}s.
+   */
+  public MachineNode[] getRandomMachines(int expected);
+
+  /**
+   * Get {@link MachineNode} by its host name.
+   * 
+   * @return The {@line MachineNode} with the same name. Or null if not found.
+   */
+  public MachineNode getMachineByName(String name);
+  
+  /**
+   * Get {@link RackNode} by its name.
+   * @return The {@line RackNode} with the same name. Or null if not found.
+   */
+  public RackNode getRackByName(String name);
+
+  /**
+   * Determine the distance between two {@link Node}s. Currently, the distance
+   * is loosely defined as the length of the longer path for either a or b to
+   * reach their common ancestor.
+   * 
+   * @param a
+   * @param b
+   * @return The distance between {@link Node} a and {@link Node} b.
+   */
+  int distance(Node a, Node b);
+  
+  /**
+   * Get the maximum distance possible between any two nodes.
+   * @return the maximum distance possible between any two nodes.
+   */
+  int getMaximumDistance();
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Reading JSON-encoded cluster topology and produce the parsed
+ * {@link LoggedNetworkTopology} object.
+ */
+public class ClusterTopologyReader {
+  private LoggedNetworkTopology topology;
+
+  private void readTopology(JsonObjectMapperParser<LoggedNetworkTopology> parser)
+      throws IOException {
+    try {
+      topology = parser.getNext();
+      if (topology == null) {
+        throw new IOException(
+            "Input file does not contain valid topology data.");
+      }
+    } finally {
+      parser.close();
+    }
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param path
+   *          Path to the JSON-encoded topology file, possibly compressed.
+   * @param conf
+   * @throws IOException
+   */
+  public ClusterTopologyReader(Path path, Configuration conf)
+      throws IOException {
+    JsonObjectMapperParser<LoggedNetworkTopology> parser = new JsonObjectMapperParser<LoggedNetworkTopology>(
+        path, LoggedNetworkTopology.class, conf);
+    readTopology(parser);
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param input
+   *          The input stream for the JSON-encoded topology data.
+   */
+  public ClusterTopologyReader(InputStream input) throws IOException {
+    JsonObjectMapperParser<LoggedNetworkTopology> parser = new JsonObjectMapperParser<LoggedNetworkTopology>(
+        input, LoggedNetworkTopology.class);
+    readTopology(parser);
+  }
+
+  /**
+   * Get the {@link LoggedNetworkTopology} object.
+   * 
+   * @return The {@link LoggedNetworkTopology} object parsed from the input.
+   */
+  public LoggedNetworkTopology get() {
+    return topology;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Sat Sep 12 09:31:34 2009
@@ -66,7 +66,7 @@
 import org.codehaus.jackson.map.SerializationConfig;
 
 /**
- * This is the mainclass for rumen log mining functionality.
+ * This is the main class for rumen log mining functionality.
  * 
  * It reads a directory of job tracker logs, and computes various information
  * about it. See {@code usage()}, below.
@@ -85,11 +85,12 @@
    */
   private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
 
-  // This element is to compensate for the fact that our percentiles
-  // engine rounds up for the expected sample count, so if the total
-  // number of readings is small enough we need to compensate slightly
-  // when aggregating the spread data from jobs with few reducers together
-  // with jobs with many reducers.
+  /**
+   * This element is to compensate for the fact that our percentiles engine
+   * rounds up for the expected sample count, so if the total number of readings
+   * is small enough we need to compensate slightly when aggregating the spread
+   * data from jobs with few reducers together with jobs with many reducers.
+   */
   private static final long SMALL_SPREAD_COMPENSATION_THRESHOLD = 5L;
 
   /**
@@ -317,7 +318,7 @@
     Path jobTraceFilename = null;
     Path topologyFilename = null;
     if (args.length == 0 || args[args.length - 1].charAt(0) == '-') {
-      inputFilename = null;
+      throw new IllegalArgumentException("No input specified.");
     } else {
       inputFilename = args[args.length - 1];
     }
@@ -547,6 +548,7 @@
       IOException {
     if (input != null) {
       input.close();
+      LOG.info("File closed: "+currentFileName);
       input = null;
     }
 
@@ -580,6 +582,9 @@
 
   private String readInputLine() throws IOException {
     try {
+      if (input == null) {
+        return null;
+      }
       inputLineText.clear();
       if (input.readLine(inputLineText) == 0) {
         return null;
@@ -1002,7 +1007,7 @@
 
           ParsedHost node = getAndRecordParsedHost(nextSplit);
 
-          if (locations != null) {
+          if (locations != null && node != null) {
             locations.add(node.makeLoggedLocation());
           }
         }
@@ -1104,82 +1109,95 @@
   private void incorporateCounters(LoggedTaskAttempt attempt2,
       String counterString) {
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.hdfsBytesRead = val;
       }
     }, counterString, "HDFS_BYTES_READ");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.hdfsBytesWritten = val;
       }
     }, counterString, "HDFS_BYTES_WRITTEN");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.fileBytesRead = val;
       }
     }, counterString, "FILE_BYTES_READ");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.fileBytesWritten = val;
       }
     }, counterString, "FILE_BYTES_WRITTEN");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.mapInputBytes = val;
       }
     }, counterString, "MAP_INPUT_BYTES");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.mapInputRecords = val;
       }
     }, counterString, "MAP_INPUT_RECORDS");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.mapOutputBytes = val;
       }
     }, counterString, "MAP_OUTPUT_BYTES");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.mapOutputRecords = val;
       }
     }, counterString, "MAP_OUTPUT_RECORDS");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.combineInputRecords = val;
       }
     }, counterString, "COMBINE_INPUT_RECORDS");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.reduceInputGroups = val;
       }
     }, counterString, "REDUCE_INPUT_GROUPS");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.reduceInputRecords = val;
       }
     }, counterString, "REDUCE_INPUT_RECORDS");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.reduceShuffleBytes = val;
       }
     }, counterString, "REDUCE_SHUFFLE_BYTES");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.reduceOutputRecords = val;
       }
     }, counterString, "REDUCE_OUTPUT_RECORDS");
     incorporateCounter(new SetField(attempt2) {
+      @Override
       void set(long val) {
         attempt.spilledRecords = val;
       }
     }, counterString, "SPILLED_RECORDS");
   }
 
-  private ParsedHost getAndRecordParsedHost(String hostName)
-      throws IllegalArgumentException {
-    ParsedHost result = new ParsedHost(hostName);
+  private ParsedHost getAndRecordParsedHost(String hostName) {
+    ParsedHost result = ParsedHost.parse(hostName);
 
-    if (!allHosts.contains(result)) {
+    if (result != null && !allHosts.contains(result)) {
       allHosts.add(result);
     }
 
@@ -1259,28 +1277,23 @@
 
         ParsedHost host = null;
 
-        try {
-          host = getAndRecordParsedHost(hostName);
-        } catch (IllegalArgumentException e) {
-        }
+        host = getAndRecordParsedHost(hostName);
 
         if (host != null) {
           attempt.setLocation(host.makeLoggedLocation());
         }
 
-        if (task != null) {
-          ArrayList<LoggedLocation> locs = task.getPreferredLocations();
+        ArrayList<LoggedLocation> locs = task.getPreferredLocations();
 
-          if (host != null && locs != null) {
-            for (LoggedLocation loc : locs) {
-              ParsedHost preferedLoc = new ParsedHost(loc);
+        if (host != null && locs != null) {
+          for (LoggedLocation loc : locs) {
+            ParsedHost preferedLoc = new ParsedHost(loc);
 
-              distance = Math.min(distance, preferedLoc.distance(host));
-            }
+            distance = Math.min(distance, preferedLoc.distance(host));
           }
-
-          mapperLocality.enter(distance);
         }
+
+        mapperLocality.enter(distance);
       }
 
       distance = Math.min(distance, successfulMapAttemptTimes.length - 1);

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java Sat Sep 12 09:31:34 2009
@@ -127,12 +127,14 @@
     long cumulativeCount = 0;
     int bucketCursor = 0;
 
+    
     // Loop invariant: the item at buckets[bucketCursor] can still be reached
-    // from iter, and the number of logged elements no longer
-    // available from iter is cumulativeCount .
-    //
+    // from iter, and the number of logged elements no longer available from
+    // iter is cumulativeCount.
+    // 
     // cumulativeCount/totalCount is therefore strictly less than
     // buckets[bucketCursor]/scale .
+     
     while (iter.hasNext()) {
       long targetCumulativeCount = buckets[bucketCursor] * totalCount / scale;
 

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java Sat Sep 12 09:31:34 2009
@@ -18,6 +18,7 @@
 package org.apache.hadoop.tools.rumen;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobHistory;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -99,7 +100,6 @@
   /**
    * Get {@link TaskAttemptInfo} for a given task-attempt, considering impact
    * of locality.
-   * @param taskType {@link TaskType} of the task-attempt
    * @param taskNumber Partition number of the task-attempt
    * @param taskAttemptNumber Attempt number of the task
    * @param locality Data locality of the task as scheduled in simulation
@@ -109,4 +109,10 @@
     getMapTaskAttemptInfoAdjusted(int taskNumber,
                                   int taskAttemptNumber,
                                   int locality);
+  
+  /**
+   * Get the outcome of the job execution.
+   * @return The outcome of the job execution.
+   */
+  public JobHistory.Values getOutcome();
 }

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * {@link JobStoryProducer} produces the sequence of {@link JobStory}'s.
+ */
+public interface JobStoryProducer extends Closeable {
+  /**
+   * Get the next job.
+   * @return The next job. Or null if no more job is available.
+   * @throws IOException 
+   */
+  JobStory getNextJob() throws IOException;
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Reading JSON-encoded job traces and produce {@link LoggedJob} instances.
+ */
+public class JobTraceReader extends JsonObjectMapperParser<LoggedJob> {
+  /**
+   * Constructor.
+   * 
+   * @param path
+   *          Path to the JSON trace file, possibly compressed.
+   * @param conf
+   * @throws IOException
+   */
+  public JobTraceReader(Path path, Configuration conf) throws IOException {
+    super(path, LoggedJob.class, conf);
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param input
+   *          The input stream for the JSON trace.
+   */
+  public JobTraceReader(InputStream input) throws IOException {
+    super(input, LoggedJob.class);
+  }
+}

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * A simple wrapper for parsing JSON-encoded data using ObjectMapper.
+ * @param <T> The (base) type of the object(s) to be parsed by this parser.
+ */
+class JsonObjectMapperParser<T> implements Closeable {
+  private final ObjectMapper mapper;
+  private final Class<? extends T> clazz;
+  private final JsonParser jsonParser;
+  private final Decompressor decompressor;
+
+  /**
+   * Constructor.
+   * 
+   * @param path 
+   *          Path to the JSON data file, possibly compressed.
+   * @param conf
+   * @throws IOException
+   */
+  public JsonObjectMapperParser(Path path, Class<? extends T> clazz,
+      Configuration conf) throws IOException {
+    mapper = new ObjectMapper();
+    mapper.configure(
+        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    this.clazz = clazz;
+    FileSystem fs = path.getFileSystem(conf);
+    CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
+    InputStream input;
+    if (codec == null) {
+      input = fs.open(path);
+      decompressor = null;
+    } else {
+      FSDataInputStream fsdis = fs.open(path);
+      decompressor = CodecPool.getDecompressor(codec);
+      input = codec.createInputStream(fsdis, decompressor);
+    }
+    jsonParser = mapper.getJsonFactory().createJsonParser(input);
+  }
+
+  /**
+   * Constructor.
+   * 
+   * @param input
+   *          The input stream for the JSON data.
+   */
+  public JsonObjectMapperParser(InputStream input, Class<? extends T> clazz)
+      throws IOException {
+    mapper = new ObjectMapper();
+    mapper.configure(
+        DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    this.clazz = clazz;
+    decompressor = null;
+    jsonParser = mapper.getJsonFactory().createJsonParser(input);
+  }
+
+  /**
+   * Get the next object from the trace.
+   * 
+   * @return The next instance of the object. Or null if we reach the end of
+   *         stream.
+   * @throws IOException
+   */
+  public T getNext() throws IOException {
+    try {
+      return mapper.readValue(jsonParser, clazz);
+    } catch (EOFException e) {
+      return null;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      jsonParser.close();
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java Sat Sep 12 09:31:34 2009
@@ -50,20 +50,8 @@
   void setCDF(Histogram data, int[] steps, int modulus) {
 
     numberValues = data.getTotalCount();
-
-    // HadoopLogsAnalyzer.printStackTrace();
-    // data.dump(System.out);
-
     long[] CDF = data.getCDF(modulus, steps);
 
-    /*
-     * if (CDF == null) { System.out.print("(null result)\n"); } else { for
-     * (long step : CDF) { System.out.print("One result row: " + step + "\n"); }
-     * }
-     * 
-     * System.out.print("\n");
-     */
-
     if (CDF != null) {
       minimum = CDF[0];
       maximum = CDF[CDF.length - 1];

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Sat Sep 12 09:31:34 2009
@@ -102,7 +102,7 @@
     setJobID(jobID);
   }
 
-  // TODO consider having default readers on the other objects
+  @SuppressWarnings("unused") // for input parameter ignored.
   @JsonAnySetter
   public void setUnknownAttribute(String attributeName, Object ignored) {
     if (!alreadySeenAnySetterAttributes.contains(attributeName)) {

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Sat Sep 12 09:31:34 2009
@@ -37,9 +37,11 @@
  * 
  */
 public class LoggedLocation implements DeepCompare {
-  // The full path from the root of the network to the host.
-  //
-  // NOTE that this assumes that the network topology is a tree.
+  /**
+   * The full path from the root of the network to the host.
+   * 
+   * NOTE that this assumes that the network topology is a tree.
+   */ 
   List<String> layers = new ArrayList<String>();
 
   public List<String> getLayers() {

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Sat Sep 12 09:31:34 2009
@@ -50,7 +50,7 @@
    * order.
    * 
    */
-  static private class TopoSort implements Comparator<LoggedNetworkTopology> {
+  static class TopoSort implements Comparator<LoggedNetworkTopology> {
     public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) {
       return t1.name.compareTo(t2.name);
     }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Sat Sep 12 09:31:34 2009
@@ -245,11 +245,4 @@
     compareLoggedLocations(preferredLocations, other.preferredLocations, loc,
         "preferredLocations");
   }
-  /*
-   * ArrayList<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
-   * 
-   * ArrayList<LoggedLocation> preferredLocations;
-   * 
-   * int numberMaps = -1; int numberReduces = -1;
-   */
 }

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+/**
+ * {@link MachineNode} represents the configuration of a cluster node.
+ * {@link MachineNode} should be constructed by {@link MachineNode.Builder}.
+ */
+public final class MachineNode extends Node {
+  long memory = -1; // in KB
+  int mapSlots = 1;
+  int reduceSlots = 1;
+  long memoryPerMapSlot = -1; // in KB
+  long memoryPerReduceSlot = -1; // in KB
+  int numCores = 1;
+  
+  MachineNode(String name, int level) {
+    super(name, level);
+  }
+  
+  @Override
+  public boolean equals(Object obj) {
+    // name/level sufficient
+    return super.equals(obj);
+  }
+
+  @Override
+  public int hashCode() {
+    // match equals
+    return super.hashCode();
+  }
+
+  /**
+   * Get the available physical RAM of the node.
+   * @return The available physical RAM of the node, in KB.
+   */
+  public long getMemory() {
+    return memory;
+  }
+  
+  /**
+   * Get the number of map slots of the node.
+   * @return The number of map slots of the node.
+   */
+  public int getMapSlots() {
+    return mapSlots;
+  }
+  
+  /**
+   * Get the number of reduce slots of the node.
+   * @return The number of reduce slots fo the node.
+   */
+  public int getReduceSlots() {
+    return reduceSlots;
+  }
+  
+  /**
+   * Get the amount of RAM reserved for each map slot.
+   * @return the amount of RAM reserved for each map slot, in KB.
+   */
+  public long getMemoryPerMapSlot() {
+    return memoryPerMapSlot;
+  }
+
+  /**
+   * Get the amount of RAM reserved for each reduce slot.
+   * @return the amount of RAM reserved for each reduce slot, in KB.
+   */
+  public long getMemoryPerReduceSlot() {
+    return memoryPerReduceSlot;
+  }
+  
+  /**
+   * Get the number of cores of the node.
+   * @return the number of cores of the node.
+   */
+  public int getNumCores() {
+    return numCores;
+  }
+
+  /**
+   * Get the rack node that the machine belongs to.
+   * 
+   * @return The rack node that the machine belongs to. Returns null if the
+   *         machine does not belong to any rack.
+   */
+  public RackNode getRackNode() {
+    return (RackNode)getParent();
+  }
+  
+  @Override
+  public synchronized boolean addChild(Node child) {
+    throw new IllegalStateException("Cannot add child to MachineNode");
+  }
+
+  /**
+   * Builder for a NodeInfo object
+   */
+  public static final class Builder {
+    private MachineNode node;
+    
+    /**
+     * Start building a new NodeInfo object.
+     * @param name
+     *          Unique name of the node. Typically the fully qualified domain
+     *          name.
+     */
+    public Builder(String name, int level) {
+      node = new MachineNode(name, level);
+    }
+
+    /**
+     * Set the physical memory of the node.
+     * @param memory Available RAM in KB.
+     */
+    public Builder setMemory(long memory) {
+      node.memory = memory;
+      return this;
+    }
+    
+    /**
+     * Set the number of map slot for the node.
+     * @param mapSlots The number of map slots for the node.
+     */
+    public Builder setMapSlots(int mapSlots) {
+      node.mapSlots = mapSlots;
+      return this;
+    }
+    
+    /**
+     * Set the number of reduce slot for the node.
+     * @param reduceSlots The number of reduce slots for the node.
+     */   
+    public Builder setReduceSlots(int reduceSlots) {
+      node.reduceSlots = reduceSlots;
+      return this;
+    }
+    
+    /**
+     * Set the amount of RAM reserved for each map slot.
+     * @param memoryPerMapSlot The amount of RAM reserved for each map slot, in KB.
+     */
+    public Builder setMemoryPerMapSlot(long memoryPerMapSlot) {
+      node.memoryPerMapSlot = memoryPerMapSlot;
+      return this;
+    }
+    
+    /**
+     * Set the amount of RAM reserved for each reduce slot.
+     * @param memoryPerReduceSlot The amount of RAM reserved for each reduce slot, in KB.
+     */
+    public Builder setMemoryPerReduceSlot(long memoryPerReduceSlot) {
+      node.memoryPerReduceSlot = memoryPerReduceSlot;
+      return this;
+    }
+    
+    /**
+     * Set the number of cores for the node.
+     * @param numCores Number of cores for the node.
+     */
+    public Builder setNumCores(int numCores) {
+      node.numCores = numCores;
+      return this;
+    }
+    
+    /**
+     * Clone the settings from a reference {@link MachineNode} object.
+     * @param ref The reference {@link MachineNode} object.
+     */
+    public Builder cloneFrom(MachineNode ref) {
+      node.memory = ref.memory;
+      node.mapSlots = ref.mapSlots;
+      node.reduceSlots = ref.reduceSlots;
+      node.memoryPerMapSlot = ref.memoryPerMapSlot;
+      node.memoryPerReduceSlot = ref.memoryPerReduceSlot;
+      node.numCores = ref.numCores;
+      return this;
+    }
+    
+    /**
+     * Build the {@link MachineNode} object.
+     * @return The {@link MachineNode} object being built.
+     */
+    public MachineNode build() {
+      MachineNode retVal = node;
+      node = null;
+      return retVal;
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java Sat Sep 12 09:31:34 2009
@@ -19,8 +19,11 @@
 
 import org.apache.hadoop.mapred.TaskStatus.State;
 
+/**
+ * {@link MapTaskAttemptInfo} represents the information with regard to a
+ * map task attempt.
+ */
 public class MapTaskAttemptInfo extends TaskAttemptInfo {
-
   private long runtime;
 
   public MapTaskAttemptInfo(State state, TaskInfo taskInfo, long runtime) {

Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java?rev=814122&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java (added)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java Sat Sep 12 09:31:34 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * {@link Node} represents a node in the cluster topology. A node can be a
+ * {@MachineNode}, or a {@link RackNode}, etc.
+ */
+public class Node implements Comparable<Node> {
+  private static final SortedSet<Node> EMPTY_SET = 
+    Collections.unmodifiableSortedSet(new TreeSet<Node>());
+  private Node parent;
+  private final String name;
+  private final int level;
+  private SortedSet<Node> children;
+
+  /**
+   * @param name
+   *          A unique name to identify a node in the cluster.
+   * @param level
+   *          The level of the node in the cluster
+   */
+  public Node(String name, int level) {
+    if (name == null) {
+      throw new IllegalArgumentException("Node name cannot be null");
+    }
+
+    if (level < 0) {
+      throw new IllegalArgumentException("Level cannot be negative");
+    }
+
+    this.name = name;
+    this.level = level;
+  }
+
+  /**
+   * Get the name of the node.
+   * 
+   * @return The name of the node.
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Get the level of the node.
+   * @return The level of the node.
+   */
+  public int getLevel() {
+    return level;
+  }
+  
+  private void checkChildren() {
+    if (children == null) {
+      children = new TreeSet<Node>();
+    }
+  }
+
+  /**
+   * Add a child node to this node.
+   * @param child The child node to be added. The child node should currently not be belong to another cluster topology.
+   * @return Boolean indicating whether the node is successfully added.
+   */
+  public synchronized boolean addChild(Node child) {
+    if (child.parent != null) {
+      throw new IllegalArgumentException(
+          "The child is already under another node:" + child.parent);
+    }
+    checkChildren();
+    boolean retval = children.add(child);
+    if (retval) child.parent = this;
+    return retval;
+  }
+
+  /**
+   * Does this node have any children?
+   * @return Boolean indicate whether this node has any children.
+   */
+  public synchronized boolean hasChildren() {
+    return children != null && !children.isEmpty();
+  }
+
+  /**
+   * Get the children of this node.
+   * 
+   * @return The children of this node. If no child, an empty set will be
+   *         returned. The returned set is read-only.
+   */
+  public synchronized Set<Node> getChildren() {
+    return (children == null) ? EMPTY_SET : 
+      Collections.unmodifiableSortedSet(children);
+  }
+  
+  /**
+   * Get the parent node.
+   * @return the parent node. If root node, return null.
+   */
+  public Node getParent() {
+    return parent;
+  }
+  
+  @Override
+  public int hashCode() {
+    return name.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (obj.getClass() != this.getClass())
+      return false;
+    Node other = (Node) obj;
+    return name.equals(other.name);
+  }
+  
+  @Override
+  public String toString() {
+    return "(" + name +", " + level +")";
+  }
+
+  @Override
+  public int compareTo(Node o) {
+    return name.compareTo(o.name);
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Sat Sep 12 09:31:34 2009
@@ -20,7 +20,6 @@
 import java.util.regex.Pattern;
 import java.util.regex.Matcher;
 
-import java.io.StringReader;
 import java.io.InputStream;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -38,9 +37,8 @@
 import org.xml.sax.SAXException;
 
 class ParsedConfigFile {
-  static Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
-
-  static Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+  private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+  private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
 
   final int heapMegabytes;
 
@@ -69,6 +67,7 @@
     return oldValue;
   }
 
+  @SuppressWarnings("hiding")
   ParsedConfigFile(String filenameLine, String xmlString) {
     super();
 
@@ -121,6 +120,7 @@
         NodeList fields = prop.getChildNodes();
         String attr = null;
         String value = null;
+        @SuppressWarnings("unused")
         boolean finalParameter = false;
         for (int j = 0; j < fields.getLength(); j++) {
           Node fieldNode = fields.item(j);

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Sat Sep 12 09:31:34 2009
@@ -23,27 +23,21 @@
 import java.util.regex.Matcher;
 
 class ParsedHost {
-  String rackName;
-  String nodeName;
-
-  private static Pattern splitPattern = Pattern
-      .compile("/([0-9\\\\\\.]+)/(.+)");
-
-  static int numberOfDistances() {
-    return 3;
-  }
+  private final String rackName;
+  private final String nodeName;
 
   /**
-   * @return the components, broadest first [ie., the last element is always the
-   *         individual node name]
+   * TODO the following only works for /rack/host format. Change to support
+   * arbitrary level of network names.
    */
-  String[] nameComponents() {
-    String[] result = new String[2];
+  private static final Pattern splitPattern = Pattern
+      .compile("/([^/]+)/([^/]+)");
 
-    result[0] = rackName;
-    result[1] = nodeName;
-
-    return result;
+  /**
+   * TODO handle arbitrary level of network names.
+   */
+  static int numberOfDistances() {
+    return 3;
   }
 
   String nameComponent(int i) throws IllegalArgumentException {
@@ -65,17 +59,14 @@
     return rackName.hashCode() * 17 + nodeName.hashCode();
   }
 
-  ParsedHost(String name) throws IllegalArgumentException {
+  public static ParsedHost parse(String name) {
     // separate out the node name
     Matcher matcher = splitPattern.matcher(name);
 
-    if (!matcher.matches()) {
-      throw new IllegalArgumentException("Illegal node designator: \"" + name
-          + "\"");
-    }
+    if (!matcher.matches())
+      return null;
 
-    rackName = matcher.group(1);
-    nodeName = matcher.group(2);
+    return new ParsedHost(matcher.group(1), matcher.group(2));
   }
 
   public ParsedHost(LoggedLocation loc) {
@@ -97,29 +88,28 @@
 
     return result;
   }
-
-  // expects the broadest name first
-  ParsedHost(String[] names) {
-    rackName = names[0];
-    nodeName = names[1];
+  
+  String getNodeName() {
+    return nodeName;
+  }
+  
+  String getRackName() {
+    return rackName;
   }
 
-  // returns the broadest name first
-  String[] nameList() {
-    String[] result = new String[2];
-
-    result[0] = rackName;
-    result[1] = nodeName;
-
-    return result;
+  // expects the broadest name first
+  ParsedHost(String rackName, String nodeName) {
+    this.rackName = rackName;
+    this.nodeName = nodeName;
   }
 
+  @Override
   public boolean equals(Object other) {
-    if (other instanceof ParsedHost) {
-      return distance((ParsedHost) other) == 0;
+    if (!(other instanceof ParsedHost)) {
+      return false;
     }
-
-    return false;
+    ParsedHost host = (ParsedHost) other;
+    return (nodeName.equals(host.nodeName) && rackName.equals(host.rackName));
   }
 
   int distance(ParsedHost other) {

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=814122&r1=814121&r2=814122&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Sat Sep 12 09:31:34 2009
@@ -20,24 +20,14 @@
 import java.util.Properties;
 import java.util.regex.Pattern;
 
-/**
- * 
- */
 class ParsedLine {
-
   Properties content;
-
   LogRecordType type;
 
-  static Pattern keyValPair = Pattern
+  static final Pattern keyValPair = Pattern
       .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
 
-  /**
-	 * 
-	 */
-  ParsedLine() {
-  }
-
+  @SuppressWarnings("unused") 
   ParsedLine(String fullLine, int version) {
     super();
 



Mime
View raw message