Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 74659 invoked from network); 8 Dec 2008 22:46:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Dec 2008 22:46:11 -0000 Received: (qmail 74215 invoked by uid 500); 8 Dec 2008 22:46:23 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 74171 invoked by uid 500); 8 Dec 2008 22:46:23 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 74162 invoked by uid 99); 8 Dec 2008 22:46:23 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Dec 2008 14:46:23 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Dec 2008 22:44:51 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 548AE2388979; Mon, 8 Dec 2008 14:45:40 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r724531 [1/2] - in /hadoop/core/trunk: ./ src/contrib/vaidya/ src/contrib/vaidya/src/ src/contrib/vaidya/src/java/ src/contrib/vaidya/src/java/org/ src/contrib/vaidya/src/java/org/apache/ src/contrib/vaidya/src/java/org/apache/hadoop/ src/c... Date: Mon, 08 Dec 2008 22:45:39 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081208224540.548AE2388979@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Mon Dec 8 14:45:38 2008 New Revision: 724531 URL: http://svn.apache.org/viewvc?rev=724531&view=rev Log: HADOOP-4179. Add Vaidya tool to analyze map/reduce job logs for performanc problems. (Suhas Gogate via omalley) Added: hadoop/core/trunk/src/contrib/vaidya/ hadoop/core/trunk/src/contrib/vaidya/build.xml hadoop/core/trunk/src/contrib/vaidya/src/ hadoop/core/trunk/src/contrib/vaidya/src/java/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/BalancedReducePartitioning.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatisticsInterface.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/MapTaskStatistics.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/ReduceTaskStatistics.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/TaskStatistics.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/util/ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/util/XMLUtils.java hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/vaidya.sh hadoop/core/trunk/src/docs/src/documentation/content/xdocs/vaidya.xml Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/docs/src/documentation/content/xdocs/site.xml Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=724531&r1=724530&r2=724531&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Mon Dec 8 14:45:38 2008 @@ -64,14 +64,19 @@ (szetszwo) HADOOP-4709. Add several new features and bug fixes to Chukwa. - (Jerome Boulon, Eric Yang, Andy Konwinski, Ariel Rabkin via cdouglas) - Added Hadoop Infrastructure Care Center (UI for visualize data collected by Chukwa) + Added Hadoop Infrastructure Care Center (UI for visualize data collected + by Chukwa) Added FileAdaptor for streaming small file in one chunk Added compression to archive and demux output - Added unit tests and validation for agent, collector, and demux map reduce job - Added database loader for loading demux output (sequence file) to jdbc connected database + Added unit tests and validation for agent, collector, and demux map + reduce job + Added database loader for loading demux output (sequence file) to jdbc + connected database Added algorithm to distribute collector load more evenly + (Jerome Boulon, Eric Yang, Andy Konwinski, Ariel Rabkin via cdouglas) + HADOOP-4179. Add Vaidya tool to analyze map/reduce job logs for performanc + problems. (Suhas Gogate via omalley) IMPROVEMENTS Added: hadoop/core/trunk/src/contrib/vaidya/build.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/build.xml?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/build.xml (added) +++ hadoop/core/trunk/src/contrib/vaidya/build.xml Mon Dec 8 14:45:38 2008 @@ -0,0 +1,68 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/DiagnosticTest.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.vaidya; + +import java.lang.Runnable; +import org.apache.hadoop.vaidya.statistics.job.*; +import org.apache.hadoop.vaidya.util.*; +import org.w3c.dom.Node; +import org.w3c.dom.Document; +import org.w3c.dom.NodeList; +import org.w3c.dom.Element; + +/* + * This is an abstract base class to be extended by each diagnostic test + * class. It implements Runnable interface so that if required multiple tests + * can be run in parallel. + */ +public abstract class DiagnosticTest implements Runnable { + + private static final double HIGHVAL = 0.99; + private static final double MEDIUMVAL = 0.66; + private static final double LOWVAL = 0.33; + + /* + * Job statistics are passed to this class against which this diagnostic + * test is evaluated. + */ + private JobStatistics _jobExecutionStats; + private Element _testConfigElement; + private double _impactLevel; + private boolean _evaluated; + private boolean _testPassed; + + /* + * Checks if test is already evaluated against job execution statistics + * @return - true if test is already evaluated once. + */ + public boolean isEvaluated() { + return _evaluated; + } + + /* + * If impact level (returned by evaluate method) is less than success threshold + * then test is passed (NEGATIVE) else failed (POSITIVE) which inturn indicates the + * problem with job performance + */ + public boolean istestPassed() { + return this._testPassed; + } + + + /* + * Initialize the globals + */ + public void initGlobals (JobStatistics jobExecutionStats, Element testConfigElement) { + this._jobExecutionStats = jobExecutionStats; + this._testConfigElement = testConfigElement; + } + + /* + * Returns a prescription/advice (formated text) based on the evaluation of + * diagnostic test condition (evaluate method). Individual test should override + * and implement it. If the value returned is null then the prescription advice + * is printed as provided in the test config file. + */ + public abstract String getPrescription(); + + /* + * This method prints any reference details to support the test result. Individual + * test needs to override and implement it and information printed is specific + * to individual test. + */ + public abstract String getReferenceDetails (); + + /* + * Evaluates diagnostic condition and returns impact level (value [0..1]) + * Typically this method calculates the impact of a diagnosed condition on the job performance + * (Note: for boolean conditions it is either 0 or 1). + */ + public abstract double evaluate (JobStatistics jobExecutionStats); + + /* + * Get the Title information for this test as set in the test config file + */ + public String getTitle() throws Exception { + return XMLUtils.getElementValue("Title", this._testConfigElement); + } + + /* + * Get the Description information as set in the test config file. + */ + public String getDescription() throws Exception { + return XMLUtils.getElementValue("Description", this._testConfigElement); + } + + /* + * Get the Importance value as set in the test config file. + */ + public double getImportance() throws Exception { + if (XMLUtils.getElementValue("Importance", this._testConfigElement).equalsIgnoreCase("high")) { + return HIGHVAL; + } else if (XMLUtils.getElementValue("Importance", this._testConfigElement).equalsIgnoreCase("medium")) { + return MEDIUMVAL; + } else { + return LOWVAL; + } + } + + /* + * Returns the impact level of this test condition. This value is calculated and + * returned by evaluate method. + */ + public double getImpactLevel() throws Exception { + if (!this.isEvaluated()) { + throw new Exception("Test has not been evaluated"); + } + return truncate(this._impactLevel); + } + + /* + * Get the severity level as specified in the test config file. + */ + public double getSeverityLevel() throws Exception { + return truncate ((double)(getImportance()*getImpactLevel())); + } + + /* + * Get Success Threshold as specified in the test config file. + */ + public double getSuccessThreshold() throws Exception { + double x = Double.parseDouble(XMLUtils.getElementValue("SuccessThreshold", this._testConfigElement)); + return truncate (x); + } + + /* + * Creates and returns the report element for this test based on the + * test evaluation results. + */ + public Element getReportElement(Document doc, Node parent) throws Exception { + /* + * If test is not evaluated yet then throw exception + */ + if (!this.isEvaluated()) { + throw new Exception("Test has not been evaluated"); + } + + /* + * Construct and return the report element + */ + // Insert Child ReportElement + Node reportElement = doc.createElement("TestReportElement"); + parent.appendChild(reportElement); + + // Insert title + Node item = doc.createElement("TestTitle"); + reportElement.appendChild(item); + Node value = doc.createTextNode(this.getTitle()); + item.appendChild(value); + + // Insert description + item = doc.createElement("TestDescription"); + reportElement.appendChild(item); + value = doc.createTextNode(this.getDescription()); + item.appendChild(value); + + // Insert Importance + item = doc.createElement("TestImportance"); + reportElement.appendChild(item); + String imp; + if (this.getImportance() == HIGHVAL) { + imp = "HIGH"; + } else if (this.getImportance() == MEDIUMVAL) { + imp = "MEDIUM"; + } else { + imp = "LOW"; + } + value = doc.createTextNode(imp); + item.appendChild(value); + + // Insert Importance + item = doc.createElement("TestResult"); + reportElement.appendChild(item); + if (this._testPassed) { + value = doc.createTextNode("NEGATIVE(PASSED)"); + } else { + value = doc.createTextNode("POSITIVE(FAILED)"); + } + item.appendChild(value); + + // TODO : if (!this._testPassed) { + // Insert Severity + item = doc.createElement("TestSeverity"); + reportElement.appendChild(item); + value = doc.createTextNode(""+this.getSeverityLevel()); + item.appendChild(value); + + // Insert Reference Details + item = doc.createElement("ReferenceDetails"); + reportElement.appendChild(item); + value = doc.createTextNode(""+this.getReferenceDetails()); + item.appendChild(value); + + // Insert Prescription Advice + item = doc.createElement("TestPrescription"); + String val = this.getPrescription(); + if (val == null) { + val = XMLUtils.getElementValue("Prescription", this._testConfigElement); + } + reportElement.appendChild(item); + value = doc.createTextNode(""+val); + item.appendChild(value); + // } + return (Element)reportElement; + } + + + /* + * (non-Javadoc) + * @see java.lang.Runnable#run() + */ + public void run() { + /* + * Evaluate the test + */ + this._impactLevel = this.evaluate(this._jobExecutionStats); + this._evaluated = true; + try { + if (this._impactLevel >= this.getSuccessThreshold()) { + this._testPassed = false; + } else { + this._testPassed = true; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + /* + * Returns value of element of type long part of InputElement of diagnostic + * rule + */ + protected long getInputElementLongValue (String elementName, long defaultValue) { + Element inputElement = (Element)(this._testConfigElement.getElementsByTagName("InputElement").item(0)); + Element prs = null; long value; + prs = (Element)inputElement.getElementsByTagName(elementName).item(0); + if (prs != null) { + value = Long.parseLong(prs.getFirstChild().getNodeValue().trim()); + } else { + value = defaultValue; + } + return value; + } + + /* + * Returns value of element of type double part of InputElement of diagnostic rule + */ + protected double getInputElementDoubleValue(String elementName, double defaultValue) { + Element inputElement = (Element)(this._testConfigElement.getElementsByTagName("InputElement").item(0)); + Element prs = null; double value; + prs = (Element)inputElement.getElementsByTagName(elementName).item(0); + if (prs != null) { + value = Double.parseDouble(prs.getFirstChild().getNodeValue().trim()); + } else { + value = defaultValue; + } + return value; + } + + /* + * Returns value of element of type String part of InputElement of diagnostic rule + */ + protected String getInputElementStringValue(String elementName, String defaultValue) { + Element inputElement = (Element)(this._testConfigElement.getElementsByTagName("InputElement").item(0)); + Element prs = null; String value; + prs = (Element)inputElement.getElementsByTagName(elementName).item(0); + if (prs != null) { + value = prs.getFirstChild().getNodeValue().trim(); + } else { + value = defaultValue; + } + return value; + } + + /* + * truncate doubles to 2 digit. + */ + public static double truncate(double x) + { + long y=(long)(x*100); + return (double)y/100; + } +} Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/JobDiagnoser.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.vaidya; + +import org.apache.hadoop.vaidya.util.XMLUtils; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + + +/** + * This is a base driver class for job diagnostics. Various specialty drivers that + * tests specific aspects of job problems e.g. PostExPerformanceDiagnoser extends the + * this base class. + * + */ +public class JobDiagnoser { + + /* + * XML document containing report elements, one for each rule tested + */ + private Document _report; + + /* + * @report : returns report document + */ + public Document getReport() { + return this._report; + } + + + /** + * Constructor. It initializes the report document. + */ + public JobDiagnoser () throws Exception { + + /* + * Initialize the report document, make it ready to add the child report elements + */ + DocumentBuilder builder = null; + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + try{ + builder = factory.newDocumentBuilder(); + this._report = builder.newDocument(); + } catch (ParserConfigurationException e) { + e.printStackTrace(); + } + + // Insert Root Element + Element root = (Element) this._report.createElement("PostExPerformanceDiagnosticReport"); + this._report.appendChild(root); + } + + /* + * Print the report document to console + */ + public void printReport() { + XMLUtils.printDOM(this._report); + } + + /* + * Save the report document the specified report file + * @param reportfile : path of report file. + */ + public void saveReport(String filename) { + XMLUtils.writeXmlToFile(filename, this._report); + } +} Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/PostExPerformanceDiagnoser.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.vaidya.postexdiagnosis; + + +import java.net.URL; +import java.io.InputStream; +import java.io.FileInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobHistory.JobInfo; +import org.apache.hadoop.mapred.DefaultJobHistoryParser; +import org.apache.hadoop.vaidya.util.XMLUtils; +import org.apache.hadoop.vaidya.DiagnosticTest; +import org.apache.hadoop.vaidya.JobDiagnoser; +import org.apache.hadoop.vaidya.statistics.job.JobStatistics; +import org.w3c.dom.NodeList; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + + +/** + * This class acts as a driver or rule engine for executing the post execution + * performance diagnostics tests of a map/reduce job. It prints or saves the + * diagnostic report as a xml document. + */ +public class PostExPerformanceDiagnoser extends JobDiagnoser { + + private String _jobHistoryFile = null; + private InputStream _testsConfFileIs = null; + private String _reportFile = null; + private String _jobConfFile = null; + + /* + * Data available for analysts to write post execution performance diagnostic rules + */ + private JobStatistics _jobExecutionStatistics; + + /* + * Get the report file where diagnostic report is to be saved + */ + public String getReportFile () { + return this._reportFile; + } + + /* + * Get the job history log file used in collecting the job counters + */ + public String getJobHistoryFile () { + return this._jobHistoryFile; + } + + /* + * Get the test configuration file where all the diagnostic tests are registered + * with their configuration information. + */ + public InputStream getTestsConfFileIs () { + return this._testsConfFileIs; + } + + /* + * Set the test configuration file + */ + public void setTestsConfFileIs (InputStream testsConfFileIs) { + this._testsConfFileIs = testsConfFileIs; + } + + /** + * @return JobStatistics - Object storing the job configuration and execution + * counters and statistics information + */ + public JobStatistics getJobExecutionStatistics() { + return _jobExecutionStatistics; + } + + /** + * @param jobConfFile - URL pointing to job configuration (job_conf.xml) file + * @param jobHistoryLogFile - URL pointing to job history log file + * @param testsConfFile - file path for test configuration file (optional). + * If not specified default path is:$HADOOP_HOME/contrib/vaidya/pxpd_tests_config.xml + * @param reportFile - file path for storing report (optional) + */ + public PostExPerformanceDiagnoser (String jobConfFile, String jobHistoryFile, InputStream testsConfFileIs, + String reportFile) throws Exception { + + this._jobHistoryFile = jobHistoryFile; + this._testsConfFileIs = testsConfFileIs; + this._reportFile = reportFile; + this._jobConfFile = jobConfFile; + + /* + * Read the job information necessary for post performance analysis + */ + JobConf jobConf = new JobConf(); + JobInfo jobInfo = new JobInfo(""); + readJobInformation(jobConf, jobInfo); + this._jobExecutionStatistics = new JobStatistics(jobConf, jobInfo); + } + + /** + * read and populate job statistics information. + */ + private void readJobInformation(JobConf jobConf, JobInfo jobInfo) throws Exception { + + /* + * Convert the input strings to URL + */ + URL jobConfFileUrl = new URL(this._jobConfFile); + URL jobHistoryFileUrl = new URL (this._jobHistoryFile); + + /* + * Read the Job Configuration from the jobConfFile url + */ + jobConf.addResource(jobConfFileUrl); + + /* + * Read JobHistoryFile and build job counters to evaluate diagnostic rules + */ + if (jobHistoryFileUrl.getProtocol().equals("hdfs")) { + DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.get(jobConf)); + } else if (jobHistoryFileUrl.getProtocol().equals("file")) { + DefaultJobHistoryParser.parseJobTasks (jobHistoryFileUrl.getPath(), jobInfo, FileSystem.getLocal(jobConf)); + } else { + throw new Exception("Malformed URL. Protocol: "+jobHistoryFileUrl.getProtocol()); + } + } + + /* + * print Help + */ + private static void printHelp() { + System.out.println("Usage:"); + System.out.println("PostExPerformanceDiagnoser -jobconf -joblog [-testconf ] [-report ]"); + System.out.println(); + System.out.println("-jobconf : File path for job configuration file (e.g. job_xxxx_conf.xml). It can be on HDFS or"); + System.out.println(" : local file system. It should be specified in the URL format."); + System.out.println(" : e.g. local file => file://localhost/Users/hadoop-user/job_0001_conf.xml"); + System.out.println(" : e.g. hdfs file => hdfs://namenode:port/Users/hadoop-user/hodlogs/.../job_0001_conf.xml"); + System.out.println(); + System.out.println("-joblog : File path for job history log file. It can be on HDFS or local file system."); + System.out.println(" : It should be specified in the URL format."); + System.out.println(); + System.out.println("-testconf : Optional file path for performance advisor tests configuration file. It should be available"); + System.out.println(" : on local file system and be specified as as an absolute file path."); + System.out.println(" : e.g. => /Users/hadoop-user/postex_diagnosis_tests.xml. If not specified default file will be used"); + System.out.println(" : from the hadoop-{ver}-vaidya.jar in a classpath."); + System.out.println(" : For user to view or make local copy of default tests, file is available at $HADOOP_HOME/contrib/vaidya/conf/postex_diagnosis_tests.xml"); + System.out.println(); + System.out.println("-report : Optional file path for for storing diagnostic report in a XML format. Path should be available"); + System.out.println(" : on local file system and be specified as as an absolute file path."); + System.out.println(" : e.g. => /Users/hadoop-user/postex_diagnosis_report.xml. If not specified report will be printed on console"); + System.out.println(); + System.out.println("-help : prints this usage"); + System.out.println(); + } + + /** + * @param args + */ + public static void main(String[] args) { + + String jobconffile = null; + String joblogfile = null; + InputStream testsconffileis = null; + String reportfile = null; + + /* + * Parse the command line arguments + */ + try { + for (int i=0; i srTaskList = + jobExecutionStats.getReduceTaskList(ReduceTaskKeys.INPUT_RECORDS, KeyDataType.LONG); + this.percentReduceRecordsSize = (long) (this.percent * jobExecutionStats.getLongValue(JobKeys.REDUCE_INPUT_RECORDS)); + this.totalReduces = jobExecutionStats.getLongValue(JobKeys.TOTAL_REDUCES); + long tempReduceRecordsCount = 0; + this.busyReducers = 0; + for (int i=srTaskList.size()-1; i>-1; i--) { + tempReduceRecordsCount += srTaskList.get(i).getLongValue(ReduceTaskKeys.INPUT_RECORDS); + this.busyReducers++; + if (tempReduceRecordsCount >= this.percentReduceRecordsSize) { + break; + } + } + + // Calculate Impact + return this.impact = (1 - (double)this.busyReducers/(double)this.totalReduces); + + } + + /* + * helper function to print specific reduce counter for all reduce tasks + */ + public void printReduceCounters (List> x, ReduceTaskKeys key) { + for (int i=0; i"); + } + } + + /* + * + */ + @Override + public String getPrescription() { + return + "* Use the appropriate partitioning function"+ "\n" + + "* For streaming job consider following partitioner and hadoop config parameters\n"+ + " * org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner\n" + + " * -jobconf stream.map.output.field.separator, -jobconf stream.num.map.output.key.fields"; + } + + /* + */ + @Override + public String getReferenceDetails() { + String ref = + "* TotalReduceTasks: "+this.totalReduces+"\n"+ + "* BusyReduceTasks processing "+this.percent+ "% of total records: " +this.busyReducers+"\n"+ + "* Impact: "+truncate(this.impact); + return ref; + } +} Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapSideDiskSpill.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.vaidya.postexdiagnosis.tests; + +import org.apache.hadoop.vaidya.statistics.job.JobStatistics; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.MapTaskKeys; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys; +import org.apache.hadoop.vaidya.statistics.job.MapTaskStatistics; +import org.apache.hadoop.vaidya.DiagnosticTest; +import org.w3c.dom.Element; +import java.util.Hashtable; +import java.util.List; + +/** + * + */ +public class MapSideDiskSpill extends DiagnosticTest { + + private double _impact; + private JobStatistics _job; + private long _numLocalBytesWrittenByMaps; + + /** + * + */ + public MapSideDiskSpill() { + } + + /* + * + */ + @Override + public double evaluate(JobStatistics job) { + + /* + * Set the this._job + */ + this._job = job; + + /* + * Read the Normalization Factor + */ + double normF = getInputElementDoubleValue("NormalizationFactor", 3.0); + + /* + * Get the sorted reduce task list by number MapTaskKeys.OUTPUT_BYTES + */ + List srTaskList = job.getMapTaskList(MapTaskKeys.LOCAL_BYTES_WRITTEN, KeyDataType.LONG); + int size = srTaskList.size(); + long numLocalBytesWrittenByMaps = 0; + for (int i=0; i 0) { + this._impact = (this._numLocalBytesWrittenByMaps - job.getLongValue(JobKeys.MAP_OUTPUT_BYTES))/job.getLongValue(JobKeys.MAP_OUTPUT_BYTES); + } else { + this._impact = this._numLocalBytesWrittenByMaps/job.getLongValue(JobKeys.MAP_OUTPUT_BYTES); + } + + if (this._impact > normF) { + this._impact = 1.0; + } else { + this._impact = this._impact/normF; + } + + return this._impact; + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice() + */ + @Override + public String getPrescription() { + return + "* Use combiner to lower the map output size.\n" + + "* Increase map side sort buffer size (io.sort.mb:"+this._job.getJobConf().getInt("io.sort.mb", 0) + ").\n" + + "* Increase index buffer size (io.sort.record.percent:"+ this._job.getJobConf().getInt("io.sort.record.percent", 0) + ") if number of Map Output Records are large. \n" + + "* Increase (io.sort.spill.percent:"+ this._job.getJobConf().getInt("io.sort.spill.percent", 0) + "), default 0.80 i.e. 80% of sort buffer size & index buffer size. \n"; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails() + */ + @Override + public String getReferenceDetails() { + String ref = + "* TotalMapOutputBytes: "+this._job.getLongValue(JobKeys.MAP_OUTPUT_BYTES)+"\n"+ + "* Total Local Bytes Written by Maps: "+this._numLocalBytesWrittenByMaps+"\n"+ + "* Impact: "+ truncate(this._impact); + return ref; + } +} Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/MapsReExecutionImpact.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.vaidya.postexdiagnosis.tests; + +import org.apache.hadoop.vaidya.statistics.job.JobStatistics; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys; +import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics; +import org.apache.hadoop.vaidya.DiagnosticTest; +import org.w3c.dom.Element; +import java.util.Hashtable; +import java.util.List; + +/** + * + */ +public class MapsReExecutionImpact extends DiagnosticTest { + + private double _impact; + private JobStatistics _job; + private long _percentMapsReExecuted; + + + /** + * + */ + public MapsReExecutionImpact() { + } + + /* + * Evaluate the test + */ + @Override + public double evaluate(JobStatistics job) { + + /* + * Set the this._job + */ + this._job = job; + + /* + * Calculate and return the impact + */ + this._impact = ((job.getLongValue(JobKeys.LAUNCHED_MAPS) - job.getLongValue(JobKeys.TOTAL_MAPS))/job.getLongValue(JobKeys.TOTAL_MAPS)); + this._percentMapsReExecuted = Math.round(this._impact * 100); + return this._impact; + } + + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice() + */ + @Override + public String getPrescription() { + return + "* Need careful evaluation of why maps are re-executed. \n" + + " * It could be due to some set of unstable cluster nodes.\n" + + " * It could be due application specific failures."; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails() + */ + @Override + public String getReferenceDetails() { + String ref = "* Total Map Tasks: "+this._job.getLongValue(JobKeys.TOTAL_MAPS)+"\n"+ + "* Launched Map Tasks: "+this._job.getLongValue(JobKeys.LAUNCHED_MAPS)+"\n"+ + "* Percent Maps ReExecuted: "+this._percentMapsReExecuted+"\n"+ + "* Impact: "+truncate(this._impact); + return ref; + } +} Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.vaidya.postexdiagnosis.tests; + +import org.apache.hadoop.vaidya.statistics.job.JobStatistics; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys; +import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics; +import org.apache.hadoop.vaidya.DiagnosticTest; +import org.w3c.dom.Element; +import java.util.Hashtable; +import java.util.List; + +/** + * + */ +public class ReadingHDFSFilesAsSideEffect extends DiagnosticTest { + + private double _impact; + private JobStatistics _job; + + + + /** + * + */ + public ReadingHDFSFilesAsSideEffect() { + } + + /* + * Evaluate the test + */ + @Override + public double evaluate(JobStatistics job) { + + /* + * Set the this._job + */ + this._job = job; + + /* + * Calculate and return the impact + * + * Check if job level aggregate bytes read from HDFS are more than map input bytes + * Typically they should be same unless maps and/or reducers are reading some data + * from HDFS as a side effect + * + * If side effect HDFS bytes read are >= twice map input bytes impact is treated as + * maximum. + */ + + this._impact = (job.getLongValue(JobKeys.HDFS_BYTES_READ) / job.getLongValue(JobKeys.MAP_INPUT_BYTES)); + if (this._impact >= 2.0) { + this._impact = 1; + } + else { + this._impact -= 1; + } + + return this._impact; + } + + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice() + */ + @Override + public String getPrescription() { + return + "Map and/or Reduce tasks are reading application specific files from HDFS. Make sure the replication factor\n" + + "of these HDFS files is high enough to avoid the data reading bottleneck. Typically replication factor\n" + + "can be square root of map/reduce tasks capacity of the allocated cluster."; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails() + */ + @Override + public String getReferenceDetails() { + String ref = "* Total HDFS Bytes read: "+this._job.getLongValue(JobKeys.HDFS_BYTES_READ)+"\n"+ + "* Total Map Input Bytes read: "+this._job.getLongValue(JobKeys.MAP_INPUT_BYTES)+"\n"+ + "* Impact: "+truncate(this._impact); + return ref; + } +} Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReducesReExecutionImpact.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.vaidya.postexdiagnosis.tests; + +import org.apache.hadoop.vaidya.statistics.job.JobStatistics; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType; +import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys; +import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics; +import org.apache.hadoop.vaidya.DiagnosticTest; +import org.w3c.dom.Element; +import java.util.Hashtable; +import java.util.List; + +/** + * + */ +public class ReducesReExecutionImpact extends DiagnosticTest { + + private double _impact; + private JobStatistics _job; + private long _percentReducesReExecuted; + + + /** + * + */ + public ReducesReExecutionImpact() { + } + + /* + * Evaluate the test + */ + @Override + public double evaluate(JobStatistics job) { + + /* + * Set the this._job + */ + this._job = job; + + /* + * Calculate and return the impact + */ + this._impact = ((job.getLongValue(JobKeys.LAUNCHED_REDUCES) - job.getLongValue(JobKeys.TOTAL_REDUCES))/job.getLongValue(JobKeys.TOTAL_REDUCES)); + this._percentReducesReExecuted = Math.round(this._impact * 100); + return this._impact; + } + + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getAdvice() + */ + @Override + public String getPrescription() { + return + "* Need careful evaluation of why reduce tasks are re-executed. \n" + + " * It could be due to some set of unstable cluster nodes.\n" + + " * It could be due application specific failures."; + } + + /* (non-Javadoc) + * @see org.apache.hadoop.contrib.utils.perfadvisor.diagnostic_rules.DiagnosticRule#getReferenceDetails() + */ + @Override + public String getReferenceDetails() { + String ref = + "* Total Reduce Tasks: "+this._job.getLongValue(JobKeys.TOTAL_REDUCES)+"\n"+ + "* Launched Reduce Tasks: "+this._job.getLongValue(JobKeys.LAUNCHED_REDUCES)+"\n"+ + "* Percent Reduce Tasks ReExecuted: "+this._percentReducesReExecuted + "\n" + + "* Impact: "+truncate(this._impact); + return ref; + } +} Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml Mon Dec 8 14:45:38 2008 @@ -0,0 +1,104 @@ + + + + + + + <![CDATA[Balanaced Reduce Partitioning]]> + + + + + + + + + + + + <![CDATA[Impact of Map tasks Re-Execution]]> + + + + + + + + + + + <![CDATA[Impact of Reduce tasks Re-Execution]]> + + + + + + + + + + + <![CDATA[Map and/or Reduce tasks reading HDFS data as a side effect]]> + + + + + + + + + + + <![CDATA[Map side disk spill]]> + + + + + + + 3.0 + + + + Added: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java?rev=724531&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java (added) +++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java Mon Dec 8 14:45:38 2008 @@ -0,0 +1,580 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.vaidya.statistics.job; + +import java.util.ArrayList; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobHistory; +import org.apache.hadoop.mapred.JobHistory.JobInfo; +import org.apache.hadoop.mapred.JobHistory.Keys; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import java.text.ParseException; + +//import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys; + +import java.util.Hashtable; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.regex.Matcher; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Collections; + +/** + * + */ +public class JobStatistics implements JobStatisticsInterface { + + + /* + * Pattern for parsing the COUNTERS + */ + private static final Pattern _pattern = Pattern.compile("[[^,]?]+"); //"[[^,]?]+" + + /* + * Job configuration + */ + private JobConf _jobConf; + + /** + * @param jobConf the jobConf to set + */ + void setJobConf(JobConf jobConf) { + this._jobConf = jobConf; + // TODO: Add job conf to _job array + } + + /* + * Aggregated Job level counters + */ + private JobHistory.JobInfo _jobInfo; + + /* + * Job stats + */ + private java.util.Hashtable _job; + + /** + * @param jobConf the jobConf to set + */ + public JobConf getJobConf() { + return this._jobConf; + } + + /* + * Get Job Counters of type long + */ + public long getLongValue(Enum key) { + return Long.parseLong(this._job.get(key)); + } + + /* + * Get job Counters of type Double + */ + public double getDoubleValue(Enum key) { + return Double.parseDouble(this._job.get(key)); + } + + /* + * Get Job Counters of type String + */ + public String getStringValue(Enum key) { + return this._job.get(key); + } + + /* + * Set key value of type long + */ + public void setValue(Enum key, long value) { + this._job.put(key, Long.toString(value)); + } + + /* + * Set key value of type double + */ + public void setValue(Enum key, double value) { + this._job.put(key, Double.toString(value)); + } + + /* + * Set key value of type String + */ + public void setValue(Enum key, String value) { + this._job.put(key, value); + } + + /* + * Map Task List (Sorted by task id) + */ + private ArrayList _mapTaskList = new ArrayList(); + + /* + * Reduce Task List (Sorted by task id) + */ + private ArrayList _reduceTaskList = new ArrayList(); + + + /* + * Ctor: + */ + public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException { + this._jobConf = jobConf; + this._jobInfo = jobInfo; + this._job = new Hashtable(); + populate_Job(this._job, this._jobInfo.getValues()); + populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks()); + } + + /* + * + */ + private void populate_MapReduceTaskLists (ArrayList mapTaskList, + ArrayList reduceTaskList, + java.util.Map taskMap) throws ParseException { + /* + * + */ + int num_tasks = taskMap.entrySet().size(); + java.util.Iterator> ti = taskMap.entrySet().iterator(); + for (int i = 0; i < num_tasks; i++) + { + Map.Entry entry = (Map.Entry) ti.next(); + JobHistory.Task task = entry.getValue(); + if (task.get(Keys.TASK_TYPE).equals("MAP")) { + MapTaskStatistics mapT = new MapTaskStatistics(); + java.util.Map mapTask = task.getValues(); + java.util.Map successTaskAttemptMap = getLastSuccessfulTaskAttempt(task); + // NOTE: Following would lead to less number of actual tasks collected in the tasklist array + if (successTaskAttemptMap != null) { + mapTask.putAll(successTaskAttemptMap); + } else { + System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING"); + } + int size = mapTask.size(); + java.util.Iterator> kv = mapTask.entrySet().iterator(); + for (int j = 0; j < size; j++) + { + Map.Entry mtc = kv.next(); + JobHistory.Keys key = mtc.getKey(); + String value = mtc.getValue(); + switch (key) { + case TASKID: mapT.setValue(MapTaskKeys.TASK_ID, value); break; + case TASK_ATTEMPT_ID: mapT.setValue(MapTaskKeys.ATTEMPT_ID, value); break; + case HOSTNAME: mapT.setValue(MapTaskKeys.HOSTNAME, value); break; + case TASK_TYPE: mapT.setValue(MapTaskKeys.TASK_TYPE, value); break; + case TASK_STATUS: mapT.setValue(MapTaskKeys.STATUS, value); break; + case START_TIME: mapT.setValue(MapTaskKeys.START_TIME, value); break; + case FINISH_TIME: mapT.setValue(MapTaskKeys.FINISH_TIME, value); break; + case SPLITS: mapT.setValue(MapTaskKeys.SPLITS, value); break; + case COUNTERS: + value.concat(","); + parseAndAddMapTaskCounters(mapT, value); + mapTaskList.add(mapT); + break; + default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS"); + break; + } + } + + // Add number of task attempts + mapT.setValue(MapTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString()); + + }else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) { + ReduceTaskStatistics reduceT = new ReduceTaskStatistics(); + java.util.Map reduceTask = task.getValues(); + java.util.Map successTaskAttemptMap = getLastSuccessfulTaskAttempt(task); + // NOTE: Following would lead to less number of actual tasks collected in the tasklist array + if (successTaskAttemptMap != null) { + reduceTask.putAll(successTaskAttemptMap); + } else { + System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING"); + } + int size = reduceTask.size(); + java.util.Iterator> kv = reduceTask.entrySet().iterator(); + for (int j = 0; j < size; j++) + { + Map.Entry rtc = kv.next(); + JobHistory.Keys key = rtc.getKey(); + String value = rtc.getValue(); + switch (key) { + case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break; + case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break; + case HOSTNAME: reduceT.setValue(ReduceTaskKeys.HOSTNAME, value); break; + case TASK_TYPE: reduceT.setValue(ReduceTaskKeys.TASK_TYPE, value); break; + case TASK_STATUS: reduceT.setValue(ReduceTaskKeys.STATUS, value); break; + case START_TIME: reduceT.setValue(ReduceTaskKeys.START_TIME, value); break; + case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break; + case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break; + case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break; + case COUNTERS: + value.concat(","); + parseAndAddReduceTaskCounters(reduceT, value); + reduceTaskList.add(reduceT); + break; + default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS"); + break; + } + + // Add number of task attempts + reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString()); + } + } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) { + //System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE)); + } else { + System.out.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE)); + } + } + } + + /* + * Get last successful task attempt to be added in the stats + */ + private java.util.Map getLastSuccessfulTaskAttempt(JobHistory.Task task) { + + Map taskAttempts = task.getTaskAttempts(); + int size = taskAttempts.size(); + java.util.Iterator> kv = taskAttempts.entrySet().iterator(); + for (int i=0; i tae = kv.next(); + JobHistory.TaskAttempt attempt = tae.getValue(); + if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) { + return attempt.getValues(); + } + } + + return null; + } + + /* + * Popuate the job stats + */ + private void populate_Job (Hashtable job, java.util.Map jobC) throws ParseException { + int size = jobC.size(); + java.util.Iterator> kv = jobC.entrySet().iterator(); + for (int i = 0; i < size; i++) + { + Map.Entry entry = (Map.Entry) kv.next(); + JobHistory.Keys key = entry.getKey(); + String value = entry.getValue(); + switch (key) { + case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break; + //case START_TIME: job.put(JobKeys., value); break; + case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break; + case JOBID: job.put(JobKeys.JOBID, value); break; + case JOBNAME: job.put(JobKeys.JOBNAME, value); break; + case USER: job.put(JobKeys.USER, value); break; + case JOBCONF: job.put(JobKeys.JOBCONF, value); break; + case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME, value); break; + case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME, value); break; + case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS, value); break; + case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES, value); break; + case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS, value); break; + case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES, value); break; + case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break; + case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break; + case JOB_STATUS: job.put(JobKeys.STATUS, value); break; + case COUNTERS: + value.concat(","); + parseAndAddJobCounters(job, value); + break; + default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS"); + break; + } + } + } + + /* + * Parse and add the job counters + */ + private void parseAndAddJobCounters(Hashtable job, String counters) throws ParseException { + Matcher m = _pattern.matcher(counters); + while(m.find()){ + String ctuple = m.group(0); + //String ctuple = c1tuple.substring(0, c1tuple.length()-1); + String []parts = ctuple.split(":"); + if (parts[0].equals("File Systems.Local bytes read")) { + job.put(JobKeys.LOCAL_BYTES_READ, parts[1]); + } else if (parts[0].equals("File Systems.Local bytes written")) { + job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]); + } else if (parts[0].equals("File Systems.HDFS bytes read")) { + job.put(JobKeys.HDFS_BYTES_READ, parts[1]); + } else if (parts[0].equals("File Systems.HDFS bytes written")) { + job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]); + } else if (parts[0].equals("Job Counters .Launched map tasks")) { + job.put(JobKeys.LAUNCHED_MAPS, parts[1]); + } else if (parts[0].equals("Job Counters .Launched reduce tasks")) { + job.put(JobKeys.LAUNCHED_REDUCES, parts[1]); + } else if (parts[0].equals("Job Counters .Data-local map tasks")) { + job.put(JobKeys.DATALOCAL_MAPS, parts[1]); + } else if (parts[0].equals("Job Counters .Rack-local map tasks")) { + job.put(JobKeys.RACKLOCAL_MAPS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map input records")) { + job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map output records")) { + job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) { + job.put(JobKeys.MAP_INPUT_BYTES, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) { + job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) { + job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) { + job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) { + job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) { + job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) { + job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]); + } else { + System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR"); + } + } + } + + /* + * Parse and add the Map task counters + */ + private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) { + Matcher m = _pattern.matcher(counters); + while(m.find()){ + String ctuple = m.group(0); + //String ctuple = c1tuple.substring(0, c1tuple.length()-1); + String []parts = ctuple.split(":"); + if (parts[0].equals("File Systems.Local bytes read")) { + mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]); + } else if (parts[0].equals("File Systems.Local bytes written")) { + mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]); + } else if (parts[0].equals("File Systems.HDFS bytes read")) { + mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]); + } else if (parts[0].equals("File Systems.HDFS bytes written")) { + mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map input records")) { + mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map output records")) { + mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) { + mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) { + mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) { + mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) { + mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]); + } else { + System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK"); + } + } + } + + /* + * Parse and add the reduce task counters + */ + private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) { + Matcher m = _pattern.matcher(counters); + while(m.find()){ + String ctuple = m.group(0); + //String ctuple = c1tuple.substring(0, c1tuple.length()-1); + String []parts = ctuple.split(":"); + if (parts[0].equals("File Systems.Local bytes read")) { + reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]); + } else if (parts[0].equals("File Systems.Local bytes written")) { + reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]); + } else if (parts[0].equals("File Systems.HDFS bytes read")) { + reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]); + } else if (parts[0].equals("File Systems.HDFS bytes written")) { + reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) { + reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) { + reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) { + reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) { + reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]); + } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) { + reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]); + } else { + System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK"); + } + } + } + + /* + * Print the Job Execution Statistics + * TODO: split to pring job, map/reduce task list and individual map/reduce task stats + */ + public void printJobExecutionStatistics() { + /* + * Print Job Counters + */ + System.out.println("JOB COUNTERS *********************************************"); + int size = this._job.size(); + java.util.Iterator> kv = this._job.entrySet().iterator(); + for (int i = 0; i < size; i++) + { + Map.Entry entry = (Map.Entry) kv.next(); + Enum key = entry.getKey(); + String value = entry.getValue(); + System.out.println("Key:<" + key.name() + ">, value:<"+ value +">"); + } + /* + * + */ + System.out.println("MAP COUNTERS *********************************************"); + int size1 = this._mapTaskList.size(); + for (int i = 0; i < size1; i++) + { + System.out.println("MAP TASK *********************************************"); + this._mapTaskList.get(i).printKeys(); + } + /* + * + */ + System.out.println("REDUCE COUNTERS *********************************************"); + int size2 = this._mapTaskList.size(); + for (int i = 0; i < size2; i++) + { + System.out.println("REDUCE TASK *********************************************"); + this._reduceTaskList.get(i).printKeys(); + } + } + + /* + * Hash table keeping sorted lists of map tasks based on the specific map task key + */ + private Hashtable > _sortedMapTaskListsByKey = new Hashtable>(); + + /* + * @return mapTaskList : ArrayList of MapTaskStatistics + * @param mapTaskSortKey : Specific counter key used for sorting the task list + * @param datatype : indicates the data type of the counter key used for sorting + * If sort key is null then by default map tasks are sorted using map task ids. + */ + public synchronized ArrayList + getMapTaskList(Enum mapTaskSortKey, KeyDataType dataType) { + + /* + * If mapTaskSortKey is null then use the task id as a key. + */ + if (mapTaskSortKey == null) { + mapTaskSortKey = MapTaskKeys.TASK_ID; + } + + if (this._sortedMapTaskListsByKey.get(mapTaskSortKey) == null) { + ArrayList newList = (ArrayList)this._mapTaskList.clone(); + this._sortedMapTaskListsByKey.put(mapTaskSortKey, this.sortMapTasksByKey(newList, mapTaskSortKey, dataType)); + } + return this._sortedMapTaskListsByKey.get(mapTaskSortKey); + } + + private ArrayList sortMapTasksByKey (ArrayList mapTasks, + Enum key, Enum dataType) { + MapCounterComparator mcc = new MapCounterComparator(key, dataType); + Collections.sort (mapTasks, mcc); + return mapTasks; + } + + private class MapCounterComparator implements Comparator { + + public Enum _sortKey; + public Enum _dataType; + + public MapCounterComparator(Enum key, Enum dataType) { + this._sortKey = key; + this._dataType = dataType; + } + + // Comparator interface requires defining compare method. + public int compare(MapTaskStatistics a, MapTaskStatistics b) { + if (this._dataType == KeyDataType.LONG) { + long aa = a.getLongValue(this._sortKey); + long bb = b.getLongValue(this._sortKey); + if (aabb) return 1; + } else { + return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey)); + } + + return 0; + } + } + + /* + * Reduce Array List sorting + */ + private Hashtable > _sortedReduceTaskListsByKey = new Hashtable>(); + + /* + * @return reduceTaskList : ArrayList of ReduceTaskStatistics + * @param reduceTaskSortKey : Specific counter key used for sorting the task list + * @param dataType : indicates the data type of the counter key used for sorting + * If sort key is null then, by default reduce tasks are sorted using task ids. + */ + public synchronized ArrayList + getReduceTaskList (Enum reduceTaskSortKey, KeyDataType dataType) { + + /* + * If reduceTaskSortKey is null then use the task id as a key. + */ + if (reduceTaskSortKey == null) { + reduceTaskSortKey = ReduceTaskKeys.TASK_ID; + } + + if (this._sortedReduceTaskListsByKey.get(reduceTaskSortKey) == null) { + ArrayList newList = (ArrayList)this._reduceTaskList.clone(); + this._sortedReduceTaskListsByKey.put(reduceTaskSortKey, this.sortReduceTasksByKey(newList, reduceTaskSortKey, dataType)); + } + + return this._sortedReduceTaskListsByKey.get(reduceTaskSortKey); + } + + private ArrayList sortReduceTasksByKey (ArrayList reduceTasks, + Enum key, Enum dataType) { + ReduceCounterComparator rcc = new ReduceCounterComparator(key, dataType); + Collections.sort (reduceTasks, rcc); + return reduceTasks; + } + + private class ReduceCounterComparator implements Comparator { + + public Enum _sortKey; + public Enum _dataType; //either long or string + + public ReduceCounterComparator(Enum key, Enum dataType) { + this._sortKey = key; + this._dataType = dataType; + } + + // Comparator interface requires defining compare method. + public int compare(ReduceTaskStatistics a, ReduceTaskStatistics b) { + if (this._dataType == KeyDataType.LONG) { + long aa = a.getLongValue(this._sortKey); + long bb = b.getLongValue(this._sortKey); + if (aabb) return 1; + } else { + return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey)); + } + + return 0; + } + } +}