hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r724531 [1/2] - in /hadoop/core/trunk: ./ src/contrib/vaidya/ src/contrib/vaidya/src/ src/contrib/vaidya/src/java/ src/contrib/vaidya/src/java/org/ src/contrib/vaidya/src/java/org/apache/ src/contrib/vaidya/src/java/org/apache/hadoop/ src/c...
Date Mon, 08 Dec 2008 22:45:39 GMT
Author: omalley
Date: Mon Dec  8 14:45:38 2008
New Revision: 724531

URL: http://svn.apache.org/viewvc?rev=724531&view=rev
Log:
HADOOP-4179. Add Vaidya tool to analyze map/reduce job logs for performanc
problems. (Suhas Gogate via omalley)

Added:
    hadoop/core/trunk/src/contrib/vaidya/
    hadoop/core/trunk/src/contrib/vaidya/build.xml
    hadoop/core/trunk/src/contrib/vaidya/src/
    hadoop/core/trunk/src/contrib/vaidya/src/java/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/BalancedReducePartitioning.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatisticsInterface.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/MapTaskStatistics.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/ReduceTaskStatistics.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/TaskStatistics.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/util/
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/util/XMLUtils.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/vaidya.sh
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/vaidya.xml
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=724531&r1=724530&r2=724531&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Dec  8 14:45:38 2008
@@ -64,14 +64,19 @@
     (szetszwo)
 
     HADOOP-4709. Add several new features and bug fixes to Chukwa.
-    (Jerome Boulon, Eric Yang, Andy Konwinski, Ariel Rabkin via cdouglas)
-      Added Hadoop Infrastructure Care Center (UI for visualize data collected by Chukwa)
+      Added Hadoop Infrastructure Care Center (UI for visualize data collected
+                                               by Chukwa)
       Added FileAdaptor for streaming small file in one chunk
       Added compression to archive and demux output
-      Added unit tests and validation for agent, collector, and demux map reduce job
-      Added database loader for loading demux output (sequence file) to jdbc connected database
+      Added unit tests and validation for agent, collector, and demux map 
+        reduce job
+      Added database loader for loading demux output (sequence file) to jdbc 
+        connected database
       Added algorithm to distribute collector load more evenly
+    (Jerome Boulon, Eric Yang, Andy Konwinski, Ariel Rabkin via cdouglas)
 
+    HADOOP-4179. Add Vaidya tool to analyze map/reduce job logs for performanc
+    problems. (Suhas Gogate via omalley)
 
   IMPROVEMENTS
 

Added: hadoop/core/trunk/src/contrib/vaidya/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/build.xml?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/build.xml (added)
+++ hadoop/core/trunk/src/contrib/vaidya/build.xml Mon Dec  8 14:45:38 2008
@@ -0,0 +1,68 @@
+<?xml version="1.0" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project name="vaidya" default="jar">
+
+	<import file="../build-contrib.xml" />
+
+	<target name="init">
+		<mkdir dir="${build.dir}" />
+		<mkdir dir="${build.classes}" />
+		<mkdir dir="${build.dir}/bin" />
+		<mkdir dir="${build.dir}/conf" />
+
+		<copy todir="${build.dir}/bin">
+			<!-- copy hadoop vaidya command script file to hadoop-vaidya/bin -->
+			<fileset dir="${basedir}/src/java/org/apache/hadoop/vaidya">
+				<include name="vaidya.sh" />
+			</fileset>
+		</copy>
+
+		<copy todir="${build.dir}/conf">
+			<!-- copy hadoop vaidya tests config file to chuckwa/conf -->
+			<fileset dir="${basedir}/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests">
+				<include name="postex_diagnosis_tests.xml" />
+			</fileset>
+		</copy>
+	</target>
+
+	<!-- ====================================================== -->
+	<!-- Override jar target to include the tests conf xml file -->
+	<!-- ====================================================== -->
+	<target name="jar" depends="compile" unless="skip.contrib">
+		<echo message="contrib: ${name}" />
+		<jar jarfile="${build.dir}/hadoop-${version}-${name}.jar">
+			<fileset dir="${build.classes}" />
+			<fileset dir="${basedir}/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests">
+				<include name="postex_diagnosis_tests.xml" />
+			</fileset>
+		</jar>
+	</target>
+
+	<target name="package" depends="jar">
+		<mkdir dir="${dist.dir}/contrib/${name}" />
+		<copy todir="${dist.dir}/contrib/${name}" includeEmptyDirs="false">
+			<fileset dir="${build.dir}">
+				<exclude name="**/classes/" />
+			</fileset>
+		</copy>
+		<chmod dir="${dist.dir}/contrib/${name}/bin" perm="a+x" includes="*" />
+	</target>
+
+</project>

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya;
+
+import java.lang.Runnable;
+import org.apache.hadoop.vaidya.statistics.job.*;
+import org.apache.hadoop.vaidya.util.*;
+import org.w3c.dom.Node;
+import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Element;
+
+/*
+ * This is an abstract base class to be extended by each diagnostic test 
+ * class. It implements Runnable interface so that if required multiple tests 
+ * can be run in parallel. 
+ */
+public abstract class DiagnosticTest implements Runnable {
+  
+  private static final double HIGHVAL = 0.99;
+  private static final double MEDIUMVAL = 0.66;
+  private static final double LOWVAL = 0.33;
+  
+  /*
+   * Job statistics are passed to this class against which this diagnostic 
+   * test is evaluated.
+   */
+  private JobStatistics _jobExecutionStats;
+  private Element _testConfigElement;
+  private double _impactLevel;
+  private boolean _evaluated;
+  private boolean _testPassed; 
+  
+  /* 
+   * Checks if test is already evaluated against job execution statistics
+   * @return - true if test is already evaluated once.
+   */
+  public boolean isEvaluated() {
+    return _evaluated;
+  }
+
+  /*
+   * If impact level (returned by evaluate method) is less than success threshold 
+   * then test is passed (NEGATIVE) else failed (POSITIVE) which inturn indicates the 
+   * problem with job performance  
+   */
+  public boolean istestPassed() {
+    return this._testPassed;
+  }
+  
+  
+  /*
+   * Initialize the globals
+   */
+  public void initGlobals (JobStatistics jobExecutionStats, Element testConfigElement) {
+    this._jobExecutionStats = jobExecutionStats;
+    this._testConfigElement = testConfigElement;
+  }
+  
+  /*
+   * Returns a prescription/advice (formated text) based on the evaluation of 
+   * diagnostic test condition (evaluate method). Individual test should override 
+   * and implement it. If the value returned is null then the prescription advice
+   * is printed as provided in the test config file.  
+   */
+  public abstract String getPrescription();
+  
+  /*
+   * This method prints any reference details to support the test result. Individual
+   * test needs to override and implement it and information printed is specific 
+   * to individual test. 
+   */
+  public abstract String getReferenceDetails ();
+  
+  /*
+   * Evaluates diagnostic condition and returns impact level (value [0..1])
+   * Typically this method calculates the impact of a diagnosed condition on the job performance
+   * (Note: for boolean conditions it is either 0 or 1).
+   */
+  public abstract double evaluate (JobStatistics jobExecutionStats);
+  
+  /*
+   * Get the Title information for this test as set in the test config file
+   */
+  public String getTitle() throws Exception {
+    return XMLUtils.getElementValue("Title", this._testConfigElement);
+  }
+  
+  /*
+   * Get the Description information as set in the test config file.
+   */
+  public String getDescription() throws Exception {
+    return XMLUtils.getElementValue("Description", this._testConfigElement);
+  }
+  
+  /*
+   * Get the Importance value as set in the test config file.
+   */
+  public double getImportance() throws Exception {  
+    if (XMLUtils.getElementValue("Importance", this._testConfigElement).equalsIgnoreCase("high")) {
+      return HIGHVAL;
+    } else if (XMLUtils.getElementValue("Importance", this._testConfigElement).equalsIgnoreCase("medium")) {
+      return MEDIUMVAL;
+    } else {
+      return LOWVAL;
+    }
+  }
+  
+  /*
+   * Returns the impact level of this test condition. This value is calculated and
+   * returned by evaluate method.
+   */
+  public double getImpactLevel() throws Exception {
+    if (!this.isEvaluated()) {
+      throw new Exception("Test has not been evaluated");
+    }
+    return truncate(this._impactLevel);
+  }
+
+  /* 
+   * Get the severity level as specified in the test config file.
+   */
+  public double getSeverityLevel() throws Exception {
+    return truncate ((double)(getImportance()*getImpactLevel()));
+  }
+
+  /*
+   * Get Success Threshold as specified in the test config file.
+   */
+  public double getSuccessThreshold() throws Exception {
+    double x = Double.parseDouble(XMLUtils.getElementValue("SuccessThreshold", this._testConfigElement));
+    return truncate (x);
+  }
+  
+  /*
+   * Creates and returns the report element for this test based on the 
+   * test evaluation results.
+   */
+  public Element getReportElement(Document doc, Node parent) throws Exception {
+    /* 
+     * If test is not evaluated yet then throw exception
+     */
+    if (!this.isEvaluated()) {
+      throw new Exception("Test has not been evaluated");
+    }
+    
+    /*
+     * Construct and return the report element
+     */
+    // Insert Child ReportElement
+    Node reportElement = doc.createElement("TestReportElement");
+    parent.appendChild(reportElement);
+
+    // Insert title
+    Node item = doc.createElement("TestTitle");
+    reportElement.appendChild(item);
+    Node value = doc.createTextNode(this.getTitle());
+    item.appendChild(value);
+
+    // Insert description
+    item = doc.createElement("TestDescription");
+    reportElement.appendChild(item);
+    value = doc.createTextNode(this.getDescription());
+    item.appendChild(value);
+
+    // Insert Importance
+    item = doc.createElement("TestImportance");
+    reportElement.appendChild(item);
+    String imp;
+    if (this.getImportance() == HIGHVAL) {
+      imp = "HIGH";
+    } else if (this.getImportance() == MEDIUMVAL) {
+      imp = "MEDIUM";
+    } else { 
+      imp = "LOW";
+    }
+    value = doc.createTextNode(imp);
+    item.appendChild(value);
+
+    // Insert Importance
+    item = doc.createElement("TestResult");
+    reportElement.appendChild(item);
+    if (this._testPassed) {
+      value = doc.createTextNode("NEGATIVE(PASSED)");
+    } else {
+      value = doc.createTextNode("POSITIVE(FAILED)");
+    }
+    item.appendChild(value);
+      
+    // TODO : if (!this._testPassed) {
+    // Insert Severity
+    item = doc.createElement("TestSeverity");
+    reportElement.appendChild(item);
+    value = doc.createTextNode(""+this.getSeverityLevel());
+    item.appendChild(value);
+    
+    // Insert Reference Details
+    item = doc.createElement("ReferenceDetails");
+    reportElement.appendChild(item);
+    value = doc.createTextNode(""+this.getReferenceDetails());
+    item.appendChild(value);
+    
+    // Insert Prescription Advice
+    item = doc.createElement("TestPrescription");
+    String val = this.getPrescription();
+    if (val == null) {
+      val = XMLUtils.getElementValue("Prescription", this._testConfigElement);
+    }
+    reportElement.appendChild(item);
+    value = doc.createTextNode(""+val);
+    item.appendChild(value);
+    // }
+    return (Element)reportElement;
+  }
+  
+  
+  /* 
+   * (non-Javadoc)
+   * @see java.lang.Runnable#run()
+   */
+  public void run() {
+    /*
+     * Evaluate the test
+     */
+    this._impactLevel = this.evaluate(this._jobExecutionStats);
+    this._evaluated = true;
+    try {
+      if (this._impactLevel >= this.getSuccessThreshold()) {
+      this._testPassed = false;
+      } else {
+      this._testPassed = true;
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+  /*
+   * Returns value of element of type long part of InputElement of diagnostic 
+   * rule
+   */
+  protected long getInputElementLongValue (String elementName, long defaultValue) {
+    Element inputElement = (Element)(this._testConfigElement.getElementsByTagName("InputElement").item(0));
+    Element prs = null; long value;
+    prs = (Element)inputElement.getElementsByTagName(elementName).item(0);
+    if (prs != null) {
+      value = Long.parseLong(prs.getFirstChild().getNodeValue().trim());
+    } else {
+      value = defaultValue;
+    }
+    return value;
+  }
+  
+  /*
+   * Returns value of element of type double part of InputElement of diagnostic rule
+   */
+  protected double getInputElementDoubleValue(String elementName, double defaultValue) {
+    Element inputElement = (Element)(this._testConfigElement.getElementsByTagName("InputElement").item(0));
+    Element prs = null; double value;
+    prs = (Element)inputElement.getElementsByTagName(elementName).item(0);
+    if (prs != null) {
+      value = Double.parseDouble(prs.getFirstChild().getNodeValue().trim());
+    } else {
+      value = defaultValue;
+    }
+    return value;
+  }
+  
+  /*
+   * Returns value of element of type String part of InputElement of diagnostic rule
+   */
+  protected String getInputElementStringValue(String elementName, String defaultValue) {
+    Element inputElement = (Element)(this._testConfigElement.getElementsByTagName("InputElement").item(0));
+    Element prs = null; String value;
+    prs = (Element)inputElement.getElementsByTagName(elementName).item(0);
+    if (prs != null) {
+      value = prs.getFirstChild().getNodeValue().trim();
+    } else {
+      value = defaultValue;
+    }
+    return value;
+  }
+  
+  /*
+   * truncate doubles to 2 digit.
+   */
+  public static double truncate(double x)
+  {
+      long y=(long)(x*100);
+      return (double)y/100;
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya;
+
+import org.apache.hadoop.vaidya.util.XMLUtils;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+
+/**
+ * This is a base driver class for job diagnostics. Various specialty drivers that
+ * tests specific aspects of job problems e.g. PostExPerformanceDiagnoser extends the
+ * this base class.
+ *
+ */
+public class JobDiagnoser {
+
+  /*
+   * XML document containing report elements, one for each rule tested
+   */
+  private Document _report;
+
+  /*
+   * @report : returns report document
+   */
+  public Document getReport() {
+    return this._report;
+  }
+  
+
+  /**
+   * Constructor. It initializes the report document.
+   */
+  public JobDiagnoser () throws Exception {
+    
+    /*
+     * Initialize the report document, make it ready to add the child report elements 
+     */
+    DocumentBuilder builder = null;
+    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+    try{
+      builder = factory.newDocumentBuilder();
+      this._report = builder.newDocument();
+    } catch (ParserConfigurationException e) {
+      e.printStackTrace();
+    }
+      
+    // Insert Root Element
+    Element root = (Element) this._report.createElement("PostExPerformanceDiagnosticReport");
+    this._report.appendChild(root);
+  }
+  
+  /*
+   * Print the report document to console
+   */
+  public void printReport() {
+    XMLUtils.printDOM(this._report);
+  }
+  
+  /*
+   * Save the report document the specified report file
+   * @param reportfile : path of report file. 
+   */
+  public void saveReport(String filename) {
+    XMLUtils.writeXmlToFile(filename, this._report);
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.vaidya.postexdiagnosis;
+
+
+import java.net.URL;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapred.DefaultJobHistoryParser;
+import org.apache.hadoop.vaidya.util.XMLUtils;
+import org.apache.hadoop.vaidya.DiagnosticTest;
+import org.apache.hadoop.vaidya.JobDiagnoser;
+import org.apache.hadoop.vaidya.statistics.job.JobStatistics;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+
+/**
+ * This class acts as a driver or rule engine for executing the post execution 
+ * performance diagnostics tests of a map/reduce job. It prints or saves the 
+ * diagnostic report as a xml document. 
+ */
+public class PostExPerformanceDiagnoser extends JobDiagnoser {
+  
+  private String _jobHistoryFile = null;
+  private InputStream _testsConfFileIs = null;
+  private String _reportFile = null;
+  private String _jobConfFile = null;
+
+  /* 
+   * Data available for analysts to write post execution performance diagnostic rules 
+   */
+  private JobStatistics _jobExecutionStatistics;
+  
+  /*
+   * Get the report file where diagnostic report is to be saved
+   */
+  public String getReportFile () {
+    return this._reportFile;
+  }
+  
+  /*
+   * Get the job history log file used in collecting the job counters
+   */
+  public String getJobHistoryFile () {
+    return this._jobHistoryFile;
+  }
+  
+  /*
+   * Get the test configuration file where all the diagnostic tests are registered
+   * with their configuration information.
+   */
+  public InputStream getTestsConfFileIs () {
+    return  this._testsConfFileIs;
+  }
+    
+  /*
+   * Set the test configuration file
+   */
+  public void setTestsConfFileIs (InputStream testsConfFileIs) {
+    this._testsConfFileIs = testsConfFileIs;
+  }
+  
+  /**
+   * @return JobStatistics - Object storing the job configuration and execution
+   * counters and statistics information
+   */
+  public JobStatistics getJobExecutionStatistics() {
+    return _jobExecutionStatistics;
+  }
+
+  /**
+   * @param jobConfFile - URL pointing to job configuration (job_conf.xml) file
+   * @param jobHistoryLogFile - URL pointing to job history log file  
+   * @param testsConfFile - file path for test configuration file (optional). 
+   * If not specified default path is:$HADOOP_HOME/contrib/vaidya/pxpd_tests_config.xml
+   * @param reportFile - file path for storing report (optional)
+   */
+  public PostExPerformanceDiagnoser (String jobConfFile, String jobHistoryFile, InputStream testsConfFileIs,
+                String reportFile) throws Exception {
+    
+    this._jobHistoryFile = jobHistoryFile;
+    this._testsConfFileIs = testsConfFileIs;
+    this._reportFile = reportFile;
+    this._jobConfFile = jobConfFile;
+    
+    /*
+     * Read the job information necessary for post performance analysis
+     */
+    JobConf jobConf = new JobConf();
+    JobInfo jobInfo = new JobInfo("");
+    readJobInformation(jobConf, jobInfo);
+    this._jobExecutionStatistics = new JobStatistics(jobConf, jobInfo);
+  }
+
+  /**
+   * read and populate job statistics information.
+   */
+  private void readJobInformation(JobConf jobConf, JobInfo jobInfo) throws Exception {
+  
+    /*
+     * Convert the input strings to URL
+     */
+    URL jobConfFileUrl = new URL(this._jobConfFile);
+    URL jobHistoryFileUrl = new URL (this._jobHistoryFile);
+    
+    /*
+     * Read the Job Configuration from the jobConfFile url
+     */  
+    jobConf.addResource(jobConfFileUrl);
+    
+    /* 
+     * Read JobHistoryFile and build job counters to evaluate diagnostic rules
+     */
+    if (jobHistoryFileUrl.getProtocol().equals("hdfs")) {
+      DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.get(jobConf));
+    } else if (jobHistoryFileUrl.getProtocol().equals("file")) {
+      DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.getLocal(jobConf));
+    } else {
+      throw new Exception("Malformed URL. Protocol: "+jobHistoryFileUrl.getProtocol());
+    }
+  }
+  
+  /*
+   * print Help
+   */
+  private static void printHelp() {
+    System.out.println("Usage:");
+    System.out.println("PostExPerformanceDiagnoser -jobconf <fileurl> -joblog <fileurl> [-testconf <filepath>] [-report <filepath>]");
+    System.out.println();
+    System.out.println("-jobconf <fileurl>     : File path for job configuration file (e.g. job_xxxx_conf.xml). It can be on HDFS or");
+    System.out.println("                       : local file system. It should be specified in the URL format.");
+    System.out.println("                       : e.g. local file => file://localhost/Users/hadoop-user/job_0001_conf.xml");
+    System.out.println("                       : e.g. hdfs file  => hdfs://namenode:port/Users/hadoop-user/hodlogs/.../job_0001_conf.xml");
+    System.out.println();
+    System.out.println("-joblog <fileurl>      : File path for job history log file. It can be on HDFS or local file system.");
+    System.out.println("                       : It should be specified in the URL format.");
+    System.out.println();
+    System.out.println("-testconf <filepath>   : Optional file path for performance advisor tests configuration file. It should be available");
+    System.out.println("                       : on local file system and be specified as as an absolute file path.");
+    System.out.println("                       : e.g. => /Users/hadoop-user/postex_diagnosis_tests.xml. If not specified default file will be used");
+    System.out.println("                       : from the hadoop-{ver}-vaidya.jar in a classpath.");
+    System.out.println("                       : For user to view or make local copy of default tests, file is available at $HADOOP_HOME/contrib/vaidya/conf/postex_diagnosis_tests.xml");
+    System.out.println();
+    System.out.println("-report <filepath>     : Optional file path for for storing diagnostic report in a XML format. Path should be available");
+    System.out.println("                       : on local file system and be specified as as an absolute file path.");
+    System.out.println("                       : e.g. => /Users/hadoop-user/postex_diagnosis_report.xml. If not specified report will be printed on console");
+    System.out.println();
+    System.out.println("-help                  : prints this usage");
+    System.out.println();
+  }
+  
+  /**
+   * @param args
+   */
+  public static void main(String[] args) {
+    
+    String jobconffile = null;
+    String joblogfile = null;
+    InputStream testsconffileis = null;
+    String reportfile = null; 
+    
+    /*
+     * Parse the command line arguments
+     */
+    try {
+      for (int i=0; i<args.length-1; i=i+2) {
+        if (args[i].equalsIgnoreCase("-jobconf")) {
+          jobconffile = args[i+1];
+        } else if (args[i].equalsIgnoreCase("-joblog")) {
+          joblogfile = args[i+1];
+        } else if (args[i].equalsIgnoreCase("-testconf")) {
+          testsconffileis = new FileInputStream(new java.io.File(args[i+1]));
+        } else if (args[i].equalsIgnoreCase("-report")) {
+          reportfile = args[i+1];
+        } else if (args[i].equalsIgnoreCase("-help")) {
+          printHelp(); return;
+        } else {
+          printHelp(); return;
+        }
+      }
+    } catch (Exception e) {
+      System.out.println ("Invalid arguments.");
+      e.printStackTrace();
+      System.out.println();
+      printHelp();
+    }
+    
+    // Check if required arguments are specified
+    if (jobconffile == null || joblogfile  == null) {
+      System.out.println ("Invalid arguments: -jobconf or -joblog arguments are missing");
+      printHelp();
+      return;
+    }
+    
+    try {
+      /*
+       * Create performance advisor and read job execution statistics
+       */
+      PostExPerformanceDiagnoser pa = new PostExPerformanceDiagnoser(jobconffile,joblogfile,testsconffileis,reportfile);
+      
+      /*
+       * Read the diagnostic tests configuration file (xml)
+       */
+      if (pa.getTestsConfFileIs() == null) {
+        java.io.InputStream testsconfis = Thread.currentThread().getContextClassLoader().getResourceAsStream("postex_diagnosis_tests.xml");
+        pa.setTestsConfFileIs(testsconfis);
+      }
+      
+      /*
+       * Parse the tests configuration file
+       */
+      Document rulesDoc = XMLUtils.parse(pa.getTestsConfFileIs());
+      
+      /* 
+       * Read the diagnostic rule entries from the config file.
+       * For every rule read and load the rule class name
+       * Execute the Run() method of the class and get the report element
+       */
+      NodeList list = rulesDoc.getElementsByTagName("DiagnosticTest");
+      int list_size = list.getLength();
+      for (int i=0;i<list_size; i++) {
+        Element dRule = (Element)list.item(i);
+        NodeList cNodeList = dRule.getElementsByTagName("ClassName");
+        Element cn = (Element)cNodeList.item(0);
+        String className = cn.getFirstChild().getNodeValue().trim();
+        Class rc = Class.forName(className);
+        DiagnosticTest test = (DiagnosticTest)rc.newInstance();
+        test.initGlobals(pa.getJobExecutionStatistics(), (Element)list.item(i));
+        test.run();
+        NodeList nodelist = pa.getReport().getElementsByTagName("PostExPerformanceDiagnosticReport");
+        Element root = (Element)nodelist.item(0);
+        //root.appendChild(rule.getReportElement(pa.getReport(), root)); 
+        Element re = test.getReportElement(pa.getReport(), root);
+        //XMLUtils.printDOM(re);
+      } 
+      
+      //Optionally print or save the report
+      if (pa.getReportFile() == null) {
+        pa.printReport();
+      } else {
+        pa.saveReport(pa.getReportFile());
+      }
+    }catch (Exception e) {
+      System.out.print("Exception:"+e);
+      e.printStackTrace();
+    }
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/BalancedReducePartitioning.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/BalancedReducePartitioning.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/BalancedReducePartitioning.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/BalancedReducePartitioning.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya.postexdiagnosis.tests;
+
+import org.apache.hadoop.vaidya.statistics.job.JobStatistics;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys;
+import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics;
+import org.apache.hadoop.vaidya.DiagnosticTest;
+import org.w3c.dom.Element;
+import java.util.Hashtable;
+import java.util.List;
+
+/**
+ *
+ */
+public class BalancedReducePartitioning extends DiagnosticTest {
+
+  private long totalReduces;
+  private long busyReducers;
+  private long percentReduceRecordsSize;
+  private double percent;
+  private double impact;
+  
+  /**
+   * 
+   */
+  public BalancedReducePartitioning() {
+  }
+
+  /*    
+   */
+  @Override
+  public double evaluate(JobStatistics jobExecutionStats) {
+    
+    /*
+     * Read this rule specific input PercentReduceRecords
+     */
+    this.percent = getInputElementDoubleValue("PercentReduceRecords", 0.90);
+    
+    
+    /*
+     * Get the sorted reduce task list by number of INPUT_RECORDS (ascending) 
+     */
+    List<ReduceTaskStatistics> srTaskList = 
+                            jobExecutionStats.getReduceTaskList(ReduceTaskKeys.INPUT_RECORDS, KeyDataType.LONG);
+    this.percentReduceRecordsSize = (long) (this.percent * jobExecutionStats.getLongValue(JobKeys.REDUCE_INPUT_RECORDS));
+    this.totalReduces = jobExecutionStats.getLongValue(JobKeys.TOTAL_REDUCES);
+    long tempReduceRecordsCount = 0;
+    this.busyReducers = 0;
+    for (int i=srTaskList.size()-1; i>-1; i--) {
+      tempReduceRecordsCount += srTaskList.get(i).getLongValue(ReduceTaskKeys.INPUT_RECORDS);
+      this.busyReducers++;
+      if (tempReduceRecordsCount >= this.percentReduceRecordsSize) {
+        break;
+      }
+    }
+    
+    // Calculate Impact
+    return this.impact = (1 - (double)this.busyReducers/(double)this.totalReduces);
+    
+  }
+
+  /*
+   * helper function to print specific reduce counter for all reduce tasks
+   */
+  public void printReduceCounters (List<Hashtable<ReduceTaskKeys, String>> x, ReduceTaskKeys key) {
+    for (int i=0; i<x.size(); i++) {
+      System.out.println("ind:"+i+", Value:<"+x.get(i).get(key)+">");
+    }
+  }
+  
+  /* 
+   * 
+   */
+  @Override
+  public String getPrescription() {
+    return 
+    "* Use the appropriate partitioning function"+ "\n" +
+    "* For streaming job consider following partitioner and hadoop config parameters\n"+
+    "  * org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner\n" +
+    "  * -jobconf stream.map.output.field.separator, -jobconf stream.num.map.output.key.fields";
+  }
+
+  /* 
+   */
+  @Override
+  public String getReferenceDetails() {
+    String ref = 
+    "* TotalReduceTasks: "+this.totalReduces+"\n"+
+    "* BusyReduceTasks processing "+this.percent+ "% of total records: " +this.busyReducers+"\n"+
+    "* Impact: "+truncate(this.impact);
+    return ref;
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya.postexdiagnosis.tests;
+
+import org.apache.hadoop.vaidya.statistics.job.JobStatistics;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.MapTaskKeys;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys;
+import org.apache.hadoop.vaidya.statistics.job.MapTaskStatistics;
+import org.apache.hadoop.vaidya.DiagnosticTest;
+import org.w3c.dom.Element;
+import java.util.Hashtable;
+import java.util.List;
+
+/**
+ *
+ */
+public class MapSideDiskSpill extends DiagnosticTest {
+
+  private double _impact;
+  private JobStatistics _job;
+  private long _numLocalBytesWrittenByMaps;
+  
+  /**            
+   * 
+   */
+  public MapSideDiskSpill() {
+  }
+
+  /* 
+   *  
+   */
+  @Override
+  public double evaluate(JobStatistics job) {
+  
+    /*
+     * Set the this._job
+     */
+    this._job = job;
+      
+    /*
+     * Read the Normalization Factor
+     */
+    double normF = getInputElementDoubleValue("NormalizationFactor", 3.0);
+    
+    /*
+     * Get the sorted reduce task list by number MapTaskKeys.OUTPUT_BYTES
+     */
+    List<MapTaskStatistics> srTaskList = job.getMapTaskList(MapTaskKeys.LOCAL_BYTES_WRITTEN, KeyDataType.LONG);
+    int size = srTaskList.size();
+    long numLocalBytesWrittenByMaps = 0;
+    for (int i=0; i<size; i++) {
+      numLocalBytesWrittenByMaps += srTaskList.get(i).getLongValue(MapTaskKeys.LOCAL_BYTES_WRITTEN);
+    }
+    this._numLocalBytesWrittenByMaps = numLocalBytesWrittenByMaps;
+    
+    /*
+     * Map only job vs. map reduce job
+     */
+    if (job.getLongValue(JobKeys.TOTAL_REDUCES) > 0) {
+      this._impact = (this._numLocalBytesWrittenByMaps - job.getLongValue(JobKeys.MAP_OUTPUT_BYTES))/job.getLongValue(JobKeys.MAP_OUTPUT_BYTES);
+    } else {
+      this._impact = this._numLocalBytesWrittenByMaps/job.getLongValue(JobKeys.MAP_OUTPUT_BYTES);
+    }
+    
+    if (this._impact > normF) {
+      this._impact = 1.0;
+    } else {
+      this._impact = this._impact/normF;
+    }
+    
+    return this._impact;
+    
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice()
+   */
+  @Override
+  public String getPrescription() {
+    return 
+    "* Use combiner to lower the map output size.\n" +
+      "* Increase map side sort buffer size (io.sort.mb:"+this._job.getJobConf().getInt("io.sort.mb", 0) + ").\n" +
+      "* Increase index buffer size (io.sort.record.percent:"+ this._job.getJobConf().getInt("io.sort.record.percent", 0) + ") if number of Map Output Records are large. \n" +
+      "* Increase (io.sort.spill.percent:"+ this._job.getJobConf().getInt("io.sort.spill.percent", 0) + "), default 0.80 i.e. 80% of sort buffer size & index buffer size. \n";
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails()
+   */
+  @Override
+  public String getReferenceDetails() {
+    String ref = 
+    "* TotalMapOutputBytes: "+this._job.getLongValue(JobKeys.MAP_OUTPUT_BYTES)+"\n"+
+    "* Total Local Bytes Written by Maps: "+this._numLocalBytesWrittenByMaps+"\n"+
+    "* Impact: "+ truncate(this._impact);
+    return ref;
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya.postexdiagnosis.tests;
+
+import org.apache.hadoop.vaidya.statistics.job.JobStatistics;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys;
+import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics;
+import org.apache.hadoop.vaidya.DiagnosticTest;
+import org.w3c.dom.Element;
+import java.util.Hashtable;
+import java.util.List;
+
+/**
+ *
+ */
+public class MapsReExecutionImpact extends DiagnosticTest {
+
+  private double _impact;
+  private JobStatistics _job;
+  private long _percentMapsReExecuted;
+  
+  
+  /**
+   * 
+   */
+  public MapsReExecutionImpact() {
+  }
+
+  /*
+   * Evaluate the test    
+   */
+  @Override
+  public double evaluate(JobStatistics job) {
+    
+    /*
+     * Set the this._job
+     */
+    this._job = job;
+    
+    /*
+     * Calculate and return the impact
+     */
+    this._impact = ((job.getLongValue(JobKeys.LAUNCHED_MAPS) - job.getLongValue(JobKeys.TOTAL_MAPS))/job.getLongValue(JobKeys.TOTAL_MAPS));
+    this._percentMapsReExecuted = Math.round(this._impact * 100);
+    return this._impact;
+  }
+
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice()
+   */
+  @Override
+  public String getPrescription() {
+    return 
+    "* Need careful evaluation of why maps are re-executed. \n" +
+      "  * It could be due to some set of unstable cluster nodes.\n" +
+      "  * It could be due application specific failures.";
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails()
+   */
+  @Override
+  public String getReferenceDetails() {
+    String ref = "* Total Map Tasks: "+this._job.getLongValue(JobKeys.TOTAL_MAPS)+"\n"+
+                 "* Launched Map Tasks: "+this._job.getLongValue(JobKeys.LAUNCHED_MAPS)+"\n"+
+                 "* Percent Maps ReExecuted: "+this._percentMapsReExecuted+"\n"+
+                 "* Impact: "+truncate(this._impact);
+    return ref;
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya.postexdiagnosis.tests;
+
+import org.apache.hadoop.vaidya.statistics.job.JobStatistics;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys;
+import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics;
+import org.apache.hadoop.vaidya.DiagnosticTest;
+import org.w3c.dom.Element;
+import java.util.Hashtable;
+import java.util.List;
+
+/**
+ *
+ */
+public class ReadingHDFSFilesAsSideEffect extends DiagnosticTest {
+
+  private double _impact;
+  private JobStatistics _job;
+  
+  
+  
+  /**
+   * 
+   */
+  public ReadingHDFSFilesAsSideEffect() {
+  }
+
+  /*
+   * Evaluate the test    
+   */
+  @Override
+  public double evaluate(JobStatistics job) {
+    
+    /*
+     * Set the this._job
+     */
+    this._job = job;
+    
+    /*
+     * Calculate and return the impact
+     * 
+     * Check if job level aggregate bytes read from HDFS are more than map input bytes
+     * Typically they should be same unless maps and/or reducers are reading some data
+     * from HDFS as a side effect
+     * 
+     * If side effect HDFS bytes read are >= twice map input bytes impact is treated as
+     * maximum.
+     */
+    
+    this._impact = (job.getLongValue(JobKeys.HDFS_BYTES_READ) / job.getLongValue(JobKeys.MAP_INPUT_BYTES));
+    if (this._impact >= 2.0) {
+      this._impact = 1;
+    }
+    else  {
+      this._impact -= 1;
+    }
+    
+    return this._impact;
+  }
+
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice()
+   */
+  @Override
+  public String getPrescription() {
+    return 
+    "Map and/or Reduce tasks are reading application specific files from HDFS. Make sure the replication factor\n" +
+        "of these HDFS files is high enough to avoid the data reading bottleneck. Typically replication factor\n" +
+        "can be square root of map/reduce tasks capacity of the allocated cluster.";
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails()
+   */
+  @Override
+  public String getReferenceDetails() {
+    String ref = "* Total HDFS Bytes read: "+this._job.getLongValue(JobKeys.HDFS_BYTES_READ)+"\n"+
+                 "* Total Map Input Bytes read: "+this._job.getLongValue(JobKeys.MAP_INPUT_BYTES)+"\n"+
+                 "* Impact: "+truncate(this._impact);
+    return ref;
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya.postexdiagnosis.tests;
+
+import org.apache.hadoop.vaidya.statistics.job.JobStatistics;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType;
+import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys;
+import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics;
+import org.apache.hadoop.vaidya.DiagnosticTest;
+import org.w3c.dom.Element;
+import java.util.Hashtable;
+import java.util.List;
+
+/**
+ *
+ */
+public class ReducesReExecutionImpact extends DiagnosticTest {
+
+  private double _impact;
+  private JobStatistics _job;
+  private long _percentReducesReExecuted;
+  
+  
+  /**
+   * 
+   */
+  public ReducesReExecutionImpact() {
+  }
+
+  /*
+   * Evaluate the test    
+   */
+  @Override
+  public double evaluate(JobStatistics job) {
+    
+    /*
+     * Set the this._job
+     */
+    this._job = job;
+    
+    /*
+     * Calculate and return the impact
+     */
+    this._impact = ((job.getLongValue(JobKeys.LAUNCHED_REDUCES) - job.getLongValue(JobKeys.TOTAL_REDUCES))/job.getLongValue(JobKeys.TOTAL_REDUCES));
+    this._percentReducesReExecuted = Math.round(this._impact * 100);
+    return this._impact;
+  }
+
+  
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice()
+   */
+  @Override
+  public String getPrescription() {
+    return 
+    "* Need careful evaluation of why reduce tasks are re-executed. \n" +
+      "  * It could be due to some set of unstable cluster nodes.\n" +
+      "  * It could be due application specific failures.";
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails()
+   */
+  @Override
+  public String getReferenceDetails() {
+    String ref = 
+      "* Total Reduce Tasks: "+this._job.getLongValue(JobKeys.TOTAL_REDUCES)+"\n"+
+        "* Launched Reduce Tasks: "+this._job.getLongValue(JobKeys.LAUNCHED_REDUCES)+"\n"+
+        "* Percent Reduce Tasks ReExecuted: "+this._percentReducesReExecuted + "\n" +
+        "* Impact: "+truncate(this._impact);
+    return ref;
+  }
+}

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml Mon Dec  8 14:45:38 2008
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+**  
+ -->
+<!-- This is a diagnostic test configuration file. Diagnostic test driver 
+     reads this file to get the list of tests and their configuration information
+     
+   Title            : Provides brief description of the test
+   ClassName        : Provides the fully qualified java class name that implements the test condition
+   Description      : Provides detailed information about the test describing how it checks for a specific
+                        performance problem.
+   SuccessThreshold : (value between [0..1])        
+                    : Evaluation of a diagnostic test returns its level of impact on the job
+                      performance. If impact value [between 0..1] is equal or greater than the 
+                      success threshold, means rule has detected the problem (TEST POSITIVE) else 
+                      rule has passed the test (TEST NEGATIVE). The impact level is calculated and 
+                      returned by each test's evaluate method. For tests that are boolean in nature 
+                      the impact level is either 0 or 1 and success threshold should be 1.
+   Importance       : Indicates relative importance of this diagnostic test among the set of
+                      diagnostic rules defined in this file. Three declarative values that
+                      can be assigned are High, Medium or Low
+   Prescription     : This is an optional element to store the advice to be included in the report upon test failure
+                      This is overwritten in the report by any advice/prescription text returned by getPrescription method of 
+                      DiagnosticTest.  
+   InputElement     : Input element is made available to the diagnostic test for it to interpret and accept 
+                      any parameters specific to the test. These test specific parameters are used to configure 
+                      the tests without changing the java code.
+-->
+<PostExPerformanceDiagnosisTests>
+
+<DiagnosticTest>
+  <Title><![CDATA[Balanaced Reduce Partitioning]]></Title>
+  <ClassName><![CDATA[org.apache.hadoop.vaidya.postexdiagnosis.tests.BalancedReducePartitioning]]></ClassName>
+  <Description><![CDATA[This rule tests as to how well the input to reduce tasks is balanced]]></Description>
+  <Importance><![CDATA[High]]></Importance> 
+  <SuccessThreshold><![CDATA[0.20]]></SuccessThreshold>
+  <Prescription><![CDATA[advice]]></Prescription>
+  <InputElement>
+    <PercentReduceRecords><![CDATA[0.85]]></PercentReduceRecords>
+  </InputElement>
+</DiagnosticTest>
+
+<DiagnosticTest>
+  <Title><![CDATA[Impact of Map tasks Re-Execution]]></Title>
+  <ClassName><![CDATA[org.apache.hadoop.vaidya.postexdiagnosis.tests.MapsReExecutionImpact]]></ClassName>
+  <Description><![CDATA[This test rule checks percentage of map task re-execution impacting the job performance]]></Description>
+  <Importance><![CDATA[High]]></Importance> 
+  <SuccessThreshold><![CDATA[0.40]]></SuccessThreshold>
+  <Prescription><![CDATA[default advice]]></Prescription>
+  <InputElement>
+  </InputElement>
+</DiagnosticTest>
+
+<DiagnosticTest>
+  <Title><![CDATA[Impact of Reduce tasks Re-Execution]]></Title>
+  <ClassName><![CDATA[org.apache.hadoop.vaidya.postexdiagnosis.tests.ReducesReExecutionImpact]]></ClassName>
+  <Description><![CDATA[This test rule checks percentage of reduce task re-execution impacting the job performance]]></Description>
+  <Importance><![CDATA[High]]></Importance> 
+  <SuccessThreshold><![CDATA[0.40]]></SuccessThreshold>
+  <Prescription><![CDATA[default advice]]></Prescription>
+  <InputElement>
+  </InputElement>
+</DiagnosticTest>
+
+<DiagnosticTest>
+  <Title><![CDATA[Map and/or Reduce tasks reading HDFS data as a side effect]]></Title>
+  <ClassName><![CDATA[org.apache.hadoop.vaidya.postexdiagnosis.tests.ReadingHDFSFilesAsSideEffect]]></ClassName>
+  <Description><![CDATA[This test rule checks if map/reduce tasks are reading data from HDFS as a side effect. More the data read as a side effect can potentially be a bottleneck across parallel execution of map/reduce tasks.]]></Description>
+  <Importance><![CDATA[High]]></Importance> 
+  <SuccessThreshold><![CDATA[0.05]]></SuccessThreshold>
+  <Prescription><![CDATA[default advice]]></Prescription>
+  <InputElement>
+  </InputElement>
+</DiagnosticTest>
+
+<DiagnosticTest>
+  <Title><![CDATA[Map side disk spill]]></Title>
+  <ClassName><![CDATA[org.apache.hadoop.vaidya.postexdiagnosis.tests.MapSideDiskSpill]]></ClassName>
+  <Description><![CDATA[This test rule checks if Map tasks are spilling the data on to the local disk during the map side sorting due to insufficient sort buffer size. The impact is calculated as ratio between local bytes written to map output bytes. Impact is normalized using NormalizationFactor given below and any value greater than or equal to normalization factor is treated as maximum (i.e. 1). ]]></Description>
+  <Importance><![CDATA[Low]]></Importance> 
+  <SuccessThreshold><![CDATA[0.3]]></SuccessThreshold>
+  <Prescription><![CDATA[default advice]]></Prescription>
+  <InputElement>
+    <NormalizationFactor>3.0</NormalizationFactor>
+  </InputElement>
+</DiagnosticTest>
+
+</PostExPerformanceDiagnosisTests>

Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java?rev=724531&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java (added)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java Mon Dec  8 14:45:38 2008
@@ -0,0 +1,580 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.vaidya.statistics.job;
+
+import java.util.ArrayList;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapred.JobHistory.JobInfo;
+import org.apache.hadoop.mapred.JobHistory.Keys;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Counters.Counter;
+import java.text.ParseException;
+
+//import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
+
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Collections;
+
+/**
+ *
+ */
+public class JobStatistics implements JobStatisticsInterface {
+  
+  
+  /*
+   * Pattern for parsing the COUNTERS
+   */
+  private static final Pattern _pattern = Pattern.compile("[[^,]?]+");  //"[[^,]?]+"
+  
+  /*
+   * Job configuration
+   */
+  private JobConf _jobConf;
+  
+  /**
+   * @param jobConf the jobConf to set
+   */
+  void setJobConf(JobConf jobConf) {
+    this._jobConf = jobConf;
+    // TODO: Add job conf to _job array 
+  }
+
+  /*
+   * Aggregated Job level counters 
+   */
+  private JobHistory.JobInfo _jobInfo;
+  
+  /*
+   * Job stats 
+   */
+  private java.util.Hashtable<Enum, String> _job;
+
+  /**
+   * @param jobConf the jobConf to set
+   */
+  public JobConf getJobConf() {
+    return this._jobConf;
+  }
+  
+  /*
+   * Get Job Counters of type long
+   */
+  public long getLongValue(Enum key) {
+    return Long.parseLong(this._job.get(key));
+  }
+  
+  /*
+   * Get job Counters of type Double
+   */
+  public double getDoubleValue(Enum key) {
+    return Double.parseDouble(this._job.get(key));
+  }
+  
+  /* 
+   * Get Job Counters of type String
+   */
+  public String getStringValue(Enum key) {
+    return this._job.get(key);
+  }
+  
+  /*
+   * Set key value of type long
+   */
+  public void setValue(Enum key, long value) {
+    this._job.put(key, Long.toString(value));
+  }
+  
+  /*
+   * Set key value of type double
+   */
+  public void setValue(Enum key, double value) {
+    this._job.put(key, Double.toString(value));
+  }
+  
+  /*
+   * Set key value of type String
+   */
+  public void setValue(Enum key, String value) {
+    this._job.put(key, value);
+  }
+
+  /*
+   * Map Task List (Sorted by task id)
+   */
+  private ArrayList<MapTaskStatistics> _mapTaskList = new ArrayList<MapTaskStatistics>();
+  
+  /*
+   * Reduce Task List (Sorted by task id)
+   */
+  private ArrayList<ReduceTaskStatistics> _reduceTaskList = new ArrayList<ReduceTaskStatistics>();
+
+  
+  /* 
+   * Ctor:
+   */
+  public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException {
+    this._jobConf = jobConf;
+    this._jobInfo = jobInfo;
+    this._job = new Hashtable<Enum, String>();
+    populate_Job(this._job, this._jobInfo.getValues());
+    populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
+  }
+  
+  /*
+   * 
+   */
+  private void populate_MapReduceTaskLists (ArrayList<MapTaskStatistics> mapTaskList, 
+                              ArrayList<ReduceTaskStatistics> reduceTaskList, 
+                              java.util.Map<String, JobHistory.Task> taskMap) throws ParseException {
+    /*
+     * 
+     */
+    int num_tasks = taskMap.entrySet().size();
+    java.util.Iterator<Map.Entry<String, JobHistory.Task>> ti = taskMap.entrySet().iterator();
+    for (int i = 0; i < num_tasks; i++)
+    {
+      Map.Entry<String, JobHistory.Task> entry = (Map.Entry<String, JobHistory.Task>) ti.next();
+      JobHistory.Task task = entry.getValue();
+      if (task.get(Keys.TASK_TYPE).equals("MAP")) {
+      MapTaskStatistics mapT = new MapTaskStatistics();
+      java.util.Map<JobHistory.Keys, String> mapTask = task.getValues();
+      java.util.Map<JobHistory.Keys, String> successTaskAttemptMap  =  getLastSuccessfulTaskAttempt(task);
+      // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
+      if (successTaskAttemptMap != null) {
+        mapTask.putAll(successTaskAttemptMap);
+      } else {
+        System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
+      }
+      int size = mapTask.size();
+      java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = mapTask.entrySet().iterator();
+      for (int j = 0; j < size; j++)
+      {
+        Map.Entry<JobHistory.Keys, String> mtc = kv.next();
+        JobHistory.Keys key = mtc.getKey();
+        String value = mtc.getValue();
+        switch (key) {
+        case TASKID: mapT.setValue(MapTaskKeys.TASK_ID, value); break;
+        case TASK_ATTEMPT_ID: mapT.setValue(MapTaskKeys.ATTEMPT_ID, value); break;
+        case HOSTNAME: mapT.setValue(MapTaskKeys.HOSTNAME, value); break;
+        case TASK_TYPE: mapT.setValue(MapTaskKeys.TASK_TYPE, value); break;
+        case TASK_STATUS: mapT.setValue(MapTaskKeys.STATUS, value); break;
+        case START_TIME: mapT.setValue(MapTaskKeys.START_TIME, value); break;
+        case FINISH_TIME: mapT.setValue(MapTaskKeys.FINISH_TIME, value); break;
+        case SPLITS: mapT.setValue(MapTaskKeys.SPLITS, value); break;
+        case COUNTERS:
+          value.concat(",");
+          parseAndAddMapTaskCounters(mapT, value);
+          mapTaskList.add(mapT);
+          break;
+        default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
+          break;
+        }
+      }
+      
+      // Add number of task attempts
+      mapT.setValue(MapTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
+      
+      }else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) {
+      ReduceTaskStatistics reduceT = new ReduceTaskStatistics();
+      java.util.Map<JobHistory.Keys, String> reduceTask = task.getValues();
+      java.util.Map<JobHistory.Keys, String> successTaskAttemptMap  =  getLastSuccessfulTaskAttempt(task);
+      // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
+      if (successTaskAttemptMap != null) {
+        reduceTask.putAll(successTaskAttemptMap);
+      } else {
+        System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
+      }
+      int size = reduceTask.size();
+      java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = reduceTask.entrySet().iterator();
+      for (int j = 0; j < size; j++)
+      {
+        Map.Entry<JobHistory.Keys, String> rtc = kv.next();
+        JobHistory.Keys key = rtc.getKey();
+        String value = rtc.getValue();
+        switch (key) {
+        case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
+        case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
+        case HOSTNAME: reduceT.setValue(ReduceTaskKeys.HOSTNAME, value); break;
+        case TASK_TYPE: reduceT.setValue(ReduceTaskKeys.TASK_TYPE, value); break;
+        case TASK_STATUS: reduceT.setValue(ReduceTaskKeys.STATUS, value); break;
+        case START_TIME: reduceT.setValue(ReduceTaskKeys.START_TIME, value); break;
+        case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
+        case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
+        case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
+        case COUNTERS:
+          value.concat(",");
+          parseAndAddReduceTaskCounters(reduceT, value);
+          reduceTaskList.add(reduceT);
+          break;
+        default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
+          break;
+        }
+        
+        // Add number of task attempts
+        reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
+      }
+      } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) {
+        //System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
+      } else {
+        System.out.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
+      }
+    }
+  }
+  
+  /*
+   * Get last successful task attempt to be added in the stats
+   */
+  private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
+    
+    Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
+    int size = taskAttempts.size();
+    java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
+    for (int i=0; i<size; i++) {
+      // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
+      Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
+      JobHistory.TaskAttempt attempt = tae.getValue();
+      if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
+        return attempt.getValues();
+      }
+    }
+    
+    return null;
+  }
+  
+  /*
+   * Popuate the job stats 
+   */
+  private void populate_Job (Hashtable<Enum, String> job, java.util.Map<JobHistory.Keys, String> jobC) throws ParseException {
+    int size = jobC.size(); 
+    java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
+    for (int i = 0; i < size; i++)
+    {
+      Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
+      JobHistory.Keys key = entry.getKey();
+      String value = entry.getValue();
+      switch (key) {
+      case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
+      //case START_TIME: job.put(JobKeys., value); break;
+      case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break;
+      case JOBID: job.put(JobKeys.JOBID, value); break;
+      case JOBNAME: job.put(JobKeys.JOBNAME, value); break;
+      case USER: job.put(JobKeys.USER, value); break;
+      case JOBCONF: job.put(JobKeys.JOBCONF, value); break;
+      case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME, value); break;
+      case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME, value); break;
+      case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS, value); break;
+      case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES, value); break;
+      case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS, value); break;
+      case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES, value); break;
+      case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
+      case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
+      case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
+      case COUNTERS:
+        value.concat(",");
+        parseAndAddJobCounters(job, value);
+        break;
+      default:   System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
+               break;
+      }
+    }
+  }
+  
+  /*
+   * Parse and add the job counters
+   */
+  private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
+    Matcher m = _pattern.matcher(counters);
+    while(m.find()){
+      String ctuple = m.group(0);
+      //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
+      String []parts = ctuple.split(":");
+      if (parts[0].equals("File Systems.Local bytes read")) {
+        job.put(JobKeys.LOCAL_BYTES_READ, parts[1]);
+      } else if (parts[0].equals("File Systems.Local bytes written")) {
+        job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]);
+      } else if (parts[0].equals("File Systems.HDFS bytes read")) {
+        job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
+      } else if (parts[0].equals("File Systems.HDFS bytes written")) {
+        job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
+      } else if (parts[0].equals("Job Counters .Launched map tasks")) {
+        job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
+      } else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
+        job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
+      } else if (parts[0].equals("Job Counters .Data-local map tasks")) {
+        job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
+      } else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
+        job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
+        job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
+        job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
+        job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
+        job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
+        job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
+        job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
+        job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
+        job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
+        job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
+      } else {
+        System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
+      }
+    }  
+  }
+  
+  /*
+   * Parse and add the Map task counters
+   */
+  private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) {
+    Matcher m = _pattern.matcher(counters);
+    while(m.find()){
+      String ctuple = m.group(0);
+      //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
+      String []parts = ctuple.split(":");
+      if (parts[0].equals("File Systems.Local bytes read")) {
+        mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]);
+      } else if (parts[0].equals("File Systems.Local bytes written")) {
+        mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
+      } else if (parts[0].equals("File Systems.HDFS bytes read")) {
+        mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
+      } else if (parts[0].equals("File Systems.HDFS bytes written")) {
+        mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
+        mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
+        mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
+        mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
+        mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
+        mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
+        mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
+      } else {
+        System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
+      }
+    }    
+  }
+  
+  /*
+   * Parse and add the reduce task counters
+   */
+  private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) {
+    Matcher m = _pattern.matcher(counters);
+    while(m.find()){
+      String ctuple = m.group(0);
+      //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
+      String []parts = ctuple.split(":");
+      if (parts[0].equals("File Systems.Local bytes read")) {
+        reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]);
+      } else if (parts[0].equals("File Systems.Local bytes written")) {
+        reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
+      } else if (parts[0].equals("File Systems.HDFS bytes read")) {
+        reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
+      } else if (parts[0].equals("File Systems.HDFS bytes written")) {
+        reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
+        reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
+        reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
+        reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
+        reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
+      } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
+        reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
+      } else {
+        System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
+      }
+    }    
+  }
+  
+  /*
+   * Print the Job Execution Statistics
+   * TODO: split to pring job, map/reduce task list and individual map/reduce task stats
+   */
+  public void printJobExecutionStatistics() {
+    /*
+     * Print Job Counters
+     */
+    System.out.println("JOB COUNTERS *********************************************");
+    int size = this._job.size();
+    java.util.Iterator<Map.Entry<Enum, String>> kv = this._job.entrySet().iterator();
+    for (int i = 0; i < size; i++)
+    {
+      Map.Entry<Enum, String> entry = (Map.Entry<Enum, String>) kv.next();
+      Enum key = entry.getKey();
+      String value = entry.getValue();
+      System.out.println("Key:<" + key.name() + ">, value:<"+ value +">"); 
+    }
+    /*
+     * 
+     */
+    System.out.println("MAP COUNTERS *********************************************");
+    int size1 = this._mapTaskList.size();
+    for (int i = 0; i < size1; i++)
+    {
+      System.out.println("MAP TASK *********************************************");
+      this._mapTaskList.get(i).printKeys();
+    }
+    /*
+     * 
+     */
+    System.out.println("REDUCE COUNTERS *********************************************");
+    int size2 = this._mapTaskList.size();
+    for (int i = 0; i < size2; i++)
+    {
+      System.out.println("REDUCE TASK *********************************************");
+      this._reduceTaskList.get(i).printKeys();
+    }
+  }
+  
+  /*
+   * Hash table keeping sorted lists of map tasks based on the specific map task key
+   */
+  private Hashtable <Enum, ArrayList<MapTaskStatistics>> _sortedMapTaskListsByKey = new Hashtable<Enum, ArrayList<MapTaskStatistics>>();
+  
+  /*
+   * @return mapTaskList : ArrayList of MapTaskStatistics
+   * @param mapTaskSortKey : Specific counter key used for sorting the task list
+   * @param datatype : indicates the data type of the counter key used for sorting
+   * If sort key is null then by default map tasks are sorted using map task ids.
+   */
+  public synchronized ArrayList<MapTaskStatistics> 
+          getMapTaskList(Enum mapTaskSortKey, KeyDataType dataType) {
+    
+    /* 
+     * If mapTaskSortKey is null then use the task id as a key.
+     */
+    if (mapTaskSortKey == null) {
+      mapTaskSortKey = MapTaskKeys.TASK_ID;
+    }
+    
+    if (this._sortedMapTaskListsByKey.get(mapTaskSortKey) == null) {
+      ArrayList<MapTaskStatistics> newList = (ArrayList<MapTaskStatistics>)this._mapTaskList.clone();
+      this._sortedMapTaskListsByKey.put(mapTaskSortKey, this.sortMapTasksByKey(newList, mapTaskSortKey, dataType));
+    } 
+    return this._sortedMapTaskListsByKey.get(mapTaskSortKey);
+  }
+  
+  private ArrayList<MapTaskStatistics> sortMapTasksByKey (ArrayList<MapTaskStatistics> mapTasks, 
+                         Enum key, Enum dataType) {
+    MapCounterComparator mcc = new MapCounterComparator(key, dataType);
+    Collections.sort (mapTasks, mcc);
+    return mapTasks;
+  }
+  
+  private class MapCounterComparator implements Comparator<MapTaskStatistics> {
+
+    public Enum _sortKey;
+    public Enum _dataType;
+    
+    public MapCounterComparator(Enum key, Enum dataType) {
+      this._sortKey = key;
+      this._dataType = dataType;
+    }
+    
+    // Comparator interface requires defining compare method.
+    public int compare(MapTaskStatistics a, MapTaskStatistics b) {
+      if (this._dataType == KeyDataType.LONG) {
+        long aa = a.getLongValue(this._sortKey);
+        long bb = b.getLongValue(this._sortKey);
+        if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
+      } else {
+        return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
+      }
+      
+      return 0;
+    }
+  }
+  
+  /*
+   * Reduce Array List sorting
+   */
+    private Hashtable <Enum, ArrayList<ReduceTaskStatistics>> _sortedReduceTaskListsByKey = new Hashtable<Enum,ArrayList<ReduceTaskStatistics>>();
+  
+    /*
+     * @return reduceTaskList : ArrayList of ReduceTaskStatistics
+   * @param reduceTaskSortKey : Specific counter key used for sorting the task list
+   * @param dataType : indicates the data type of the counter key used for sorting
+   * If sort key is null then, by default reduce tasks are sorted using task ids.
+     */
+  public synchronized ArrayList<ReduceTaskStatistics> 
+                                getReduceTaskList (Enum reduceTaskSortKey, KeyDataType dataType) {
+    
+    /* 
+     * If reduceTaskSortKey is null then use the task id as a key.
+     */
+    if (reduceTaskSortKey == null) {
+      reduceTaskSortKey = ReduceTaskKeys.TASK_ID;
+    }
+    
+    if (this._sortedReduceTaskListsByKey.get(reduceTaskSortKey) == null) {
+      ArrayList<ReduceTaskStatistics> newList = (ArrayList<ReduceTaskStatistics>)this._reduceTaskList.clone();
+      this._sortedReduceTaskListsByKey.put(reduceTaskSortKey, this.sortReduceTasksByKey(newList, reduceTaskSortKey, dataType));
+    } 
+    
+    return this._sortedReduceTaskListsByKey.get(reduceTaskSortKey);  
+  }
+  
+  private ArrayList<ReduceTaskStatistics> sortReduceTasksByKey (ArrayList<ReduceTaskStatistics> reduceTasks, 
+                                Enum key, Enum dataType) {
+    ReduceCounterComparator rcc = new ReduceCounterComparator(key, dataType);
+    Collections.sort (reduceTasks, rcc);
+    return reduceTasks;
+  }
+  
+  private class ReduceCounterComparator implements Comparator<ReduceTaskStatistics> {
+
+    public Enum _sortKey;
+    public Enum _dataType;  //either long or string
+    
+    public ReduceCounterComparator(Enum key, Enum dataType) {
+      this._sortKey = key;
+      this._dataType = dataType;
+    }
+    
+    // Comparator interface requires defining compare method.
+    public int compare(ReduceTaskStatistics a, ReduceTaskStatistics b) {
+      if (this._dataType == KeyDataType.LONG) {
+        long aa = a.getLongValue(this._sortKey);
+        long bb = b.getLongValue(this._sortKey);
+        if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
+      } else {
+        return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
+      }
+      
+      return 0;
+    }
+  }
+}



Mime
View raw message