Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 13051 invoked from network); 12 Sep 2009 09:32:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 12 Sep 2009 09:32:09 -0000 Received: (qmail 25725 invoked by uid 500); 12 Sep 2009 09:32:09 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 25679 invoked by uid 500); 12 Sep 2009 09:32:09 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 25669 invoked by uid 99); 12 Sep 2009 09:32:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Sep 2009 09:32:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Sep 2009 09:31:59 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8987723888A0; Sat, 12 Sep 2009 09:31:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r814122 [1/2] - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/zombie/ src/tools/org/apache/hadoop/tools/rumen/ Date: Sat, 12 Sep 2009 09:31:36 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090912093137.8987723888A0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Sat Sep 12 09:31:34 2009 New Revision: 814122 URL: http://svn.apache.org/viewvc?rev=814122&view=rev Log: MAPREDUCE-966. Modify Rumen to clean up interfaces and simplify integration with other tools. Contributed by Hong Tang Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RackNode.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieCluster.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java Removed: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Parser.java Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceTaskAttemptInfo.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskInfo.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TreePath.java hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Sat Sep 12 09:31:34 2009 @@ -331,6 +331,9 @@ MAPREDUCE-973. Move FailJob and SleepJob from examples to test. (cdouglas via omalley) + MAPREDUCE-966. Modify Rumen to clean up interfaces and simplify integration + with other tools. (Hong Tang via cdouglas) + BUG FIXES MAPREDUCE-878. Rename fair scheduler design doc to Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestHistograms.java Sat Sep 12 09:31:34 2009 @@ -17,32 +17,25 @@ */ package org.apache.hadoop.tools.rumen; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStreamReader; -import java.io.BufferedReader; import java.io.IOException; -import java.io.PrintStream; - -import junit.framework.TestCase; import java.util.List; -import org.codehaus.jackson.JsonParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.codehaus.jackson.JsonEncoding; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.map.DeserializationConfig; -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; -/** - * - */ -public class TestHistograms extends TestCase { +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestHistograms { /** * @throws IOException @@ -58,107 +51,56 @@ * we read the corresponding goldXxx.json as a LoggedDiscreteCDF and * deepCompare them. */ + @Test public void testHistograms() throws IOException { - String rootInputDir = System.getProperty("test.tools.input.dir", ""); - - File rootInputDirFile = new File(rootInputDir); + final Configuration conf = new Configuration(); + final FileSystem lfs = FileSystem.getLocal(conf); + final Path rootInputDir = new Path( + System.getProperty("test.tools.input.dir", "")).makeQualified(lfs); + final Path rootInputFile = new Path(rootInputDir, "rumen/histogram-tests"); - File rootInputFile = new File(rootInputDirFile, "rumen/histogram-tests"); - if (rootInputDir.charAt(rootInputDir.length() - 1) == '/') { - rootInputDir = rootInputDir.substring(0, rootInputDir.length() - 1); - } - - String[] tests = rootInputFile.list(); + FileStatus[] tests = lfs.listStatus(rootInputFile); for (int i = 0; i < tests.length; ++i) { - if (tests[i].length() > 5 && "input".equals(tests[i].substring(0, 5))) { - File inputData = new File(rootInputFile, tests[i]); - - if (!(new File(rootInputFile, "build" + tests[i].substring(5))) - .exists() - && !(new File(rootInputFile, "gold" + tests[i].substring(5)) - .exists()) - && !(new File(rootInputFile, "silver" + tests[i].substring(5)) - .exists())) { - System.out - .println("Neither a build nor a gold file exists for the file, " - + inputData.getCanonicalPath()); - - continue; + Path filePath = tests[i].getPath(); + String fileName = filePath.getName(); + if (fileName.startsWith("input")) { + String testName = fileName.substring("input".length()); + Path goldFilePath = new Path(rootInputFile, "gold"+testName); + assertTrue("Gold file dies not exist", lfs.exists(goldFilePath)); + LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs); + System.out.println("Testing a Histogram for " + fileName); + FSDataInputStream goldStream = lfs.open(goldFilePath); + JsonObjectMapperParser parser = new JsonObjectMapperParser( + goldStream, LoggedDiscreteCDF.class); + try { + LoggedDiscreteCDF dcdf = parser.getNext(); + dcdf.deepCompare(newResult, new TreePath(null, "")); + } catch (DeepInequalityException e) { + fail(e.path.toString()); } - - LoggedDiscreteCDF newResult = histogramFileToCDF(inputData.getPath()); - - if ((new File(rootInputFile, "build" + tests[i].substring(5))).exists() - && !(new File(rootInputFile, "gold" + tests[i].substring(5))) - .exists() - && !(new File(rootInputFile, "silver" + tests[i].substring(5))) - .exists()) { - try { - System.out.println("Building a new gold file for the file, " - + inputData.getCanonicalPath()); - System.out.println("Please inspect it thoroughly and rename it."); - - ObjectMapper mapper = new ObjectMapper(); - JsonFactory factory = mapper.getJsonFactory(); - PrintStream ostream = new PrintStream(new File(rootInputFile, - "silver" + tests[i].substring(5))); - JsonGenerator gen = factory.createJsonGenerator(ostream, - JsonEncoding.UTF8); - gen.useDefaultPrettyPrinter(); - - gen.writeObject(newResult); - - gen.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } else { - System.out.println("Testing a Histogram built from the file, " - + inputData.getCanonicalPath()); - File goldCDF = new File(rootInputFile, "gold" + tests[i].substring(5)); - FileInputStream goldStream = new FileInputStream(goldCDF); - BufferedReader goldReader = new BufferedReader(new InputStreamReader( - goldStream)); - ObjectMapper goldMapper = new ObjectMapper(); - JsonParser goldParser = goldMapper.getJsonFactory().createJsonParser( - goldReader); - LoggedDiscreteCDF DCDF = goldMapper.readValue(goldParser, - LoggedDiscreteCDF.class); - - try { - DCDF.deepCompare(newResult, new TreePath(null, "")); - } catch (DeepInequalityException e) { - String error = e.path.toString(); - - assertFalse(error, true); - } + finally { + parser.close(); } } } } - private static LoggedDiscreteCDF histogramFileToCDF(String filename) + private static LoggedDiscreteCDF histogramFileToCDF(Path path, FileSystem fs) throws IOException { - - File inputData = new File(filename); - - FileInputStream dataStream = new FileInputStream(inputData); - BufferedReader dataReader = new BufferedReader(new InputStreamReader( - dataStream)); - ObjectMapper dataMapper = new ObjectMapper(); - dataMapper.configure( - DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); - JsonParser dataParser = dataMapper.getJsonFactory().createJsonParser( - dataReader); - HistogramRawTestData data = dataMapper.readValue(dataParser, - HistogramRawTestData.class); - + FSDataInputStream dataStream = fs.open(path); + JsonObjectMapperParser parser = new JsonObjectMapperParser( + dataStream, HistogramRawTestData.class); + HistogramRawTestData data; + try { + data = parser.getNext(); + } finally { + parser.close(); + } + Histogram hist = new Histogram(); - List measurements = data.getData(); - List typeProbeData = new HistogramRawTestData().getData(); assertTrue( @@ -180,7 +122,34 @@ } result.setCDF(hist, percentiles, data.getScale()); - return result; } + + public static void main(String[] args) throws IOException { + final Configuration conf = new Configuration(); + final FileSystem lfs = FileSystem.getLocal(conf); + + for (String arg : args) { + Path filePath = new Path(arg).makeQualified(lfs); + String fileName = filePath.getName(); + if (fileName.startsWith("input")) { + LoggedDiscreteCDF newResult = histogramFileToCDF(filePath, lfs); + String testName = fileName.substring("input".length()); + Path goldFilePath = new Path(filePath.getParent(), "gold"+testName); + + ObjectMapper mapper = new ObjectMapper(); + JsonFactory factory = mapper.getJsonFactory(); + FSDataOutputStream ostream = lfs.create(goldFilePath, true); + JsonGenerator gen = factory.createJsonGenerator(ostream, + JsonEncoding.UTF8); + gen.useDefaultPrettyPrinter(); + + gen.writeObject(newResult); + + gen.close(); + } else { + System.err.println("Input file not started with \"input\". File "+fileName+" skipped."); + } + } + } } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestPiecewiseLinearInterpolation.java Sat Sep 12 09:31:34 2009 @@ -20,9 +20,10 @@ import java.util.ArrayList; -import junit.framework.TestCase; +import org.junit.Test; +import static org.junit.Assert.*; -public class TestPiecewiseLinearInterpolation extends TestCase { +public class TestPiecewiseLinearInterpolation { static private double maximumRelativeError = 0.002D; @@ -35,6 +36,7 @@ return result; } + @Test public void testOneRun() { LoggedDiscreteCDF input = new LoggedDiscreteCDF(); Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Sat Sep 12 09:31:34 2009 @@ -18,33 +18,20 @@ package org.apache.hadoop.tools.rumen; -import java.io.BufferedOutputStream; -import java.io.BufferedReader; -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.PrintStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.ToolRunner; -import org.codehaus.jackson.JsonParseException; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.DeserializationConfig; - -import junit.framework.TestCase; - -public class TestRumenJobTraces extends TestCase { +import org.junit.Test; +import static org.junit.Assert.*; +public class TestRumenJobTraces { + @Test public void testSmallTrace() throws Exception { final Configuration conf = new Configuration(); final FileSystem lfs = FileSystem.getLocal(conf); @@ -59,11 +46,6 @@ final Path tempDir = new Path(rootTempDir, "TestRumenJobTraces"); lfs.delete(tempDir, true); - assertFalse("property test.build.data is not defined", - "".equals(rootTempDir)); - assertFalse("property test.tools.input.dir is not defined", - "".equals(rootInputDir)); - final Path topologyFile = new Path(tempDir, "topology.json"); final Path traceFile = new Path(tempDir, "trace.json"); @@ -84,248 +66,50 @@ args[5] = inputFile.toString(); - PrintStream old_stdout = System.out; - - final Path stdoutFile = new Path(tempDir, "stdout.text"); - - System.out.println("stdout file = " + stdoutFile); - - PrintStream enveloped_stdout = new PrintStream(new BufferedOutputStream( - lfs.create(stdoutFile, true))); - final Path topologyGoldFile = new Path(rootInputFile, "job-tracker-logs-topology-output"); final Path traceGoldFile = new Path(rootInputFile, "job-tracker-logs-trace-output"); - try { - System.setOut(enveloped_stdout); - - HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer(); - - int result = ToolRunner.run(analyzer, args); - - enveloped_stdout.close(); - - assertEquals("Non-zero exit", 0, result); - - } finally { - System.setOut(old_stdout); - } - - jsonFileMatchesGold(lfs, topologyFile, topologyGoldFile, - new LoggedNetworkTopology(), "topology"); - jsonFileMatchesGold(lfs, traceFile, traceGoldFile, new LoggedJob(), - "trace"); - + HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer(); + int result = ToolRunner.run(analyzer, args); + assertEquals("Non-zero exit", 0, result); + + TestRumenJobTraces + . jsonFileMatchesGold(lfs, topologyFile, + topologyGoldFile, LoggedNetworkTopology.class, "topology"); + TestRumenJobTraces. jsonFileMatchesGold(lfs, traceFile, + traceGoldFile, LoggedJob.class, "trace"); } - /* - * This block of methods is commented out because its methods require huge - * test files to support them meaningfully. We expect to be able to fix this - * problem in a furture release. - * - * public void testBulkFilesJobDistro() throws IOException { String args[] = { - * "-v1", "-delays", "-runtimes" }; statisticalTest(args, - * "rumen/large-test-inputs/monolithic-files", - * "rumen/large-test-inputs/gold-bulk-job-distribution.text", true); } - * - * public void testIndividualFilesJobDistro() throws IOException { String - * args[] = { "-v1", "-delays", "-runtimes" }; statisticalTest(args, - * "rumen/large-test-inputs/individual-files", - * "rumen/large-test-inputs/gold-individual-job-distribution.text", true); } - * - * public void testSpreadsGZFile() throws IOException { String args[] = { - * "-v1", "-delays", "-runtimes", "-spreads", "10", "90", - * "-job-digest-spectra", "10", "50", "90" }; statisticalTest( args, - * "rumen/large-test-inputs/monolithic-files/jobs-0-99-including-truncations.gz" - * , "rumen/large-test-inputs/gold-single-gz-task-distribution.text", false); - * } - * - * public void testSpreadsSingleFile() throws IOException { String args[] = { - * "-v1", "-delays", "-runtimes", "-spreads", "10", "90", - * "-job-digest-spectra", "10", "50", "90" }; statisticalTest(args, - * "rumen/large-test-inputs/monolithic-files/jobs-100-199", - * "rumen/large-test-inputs/gold-single-bulk-task-distribution.text", false); - * } - */ - - /** - * - * A test case of HadoopLogsAnalyzer.main consists of a call to this function. - * It succeeds by returning,fails by performing a junit assertion failure, and - * can abend with an I/O error if some of the inputs aren't there or some of - * the output cannot be written [due to quota, perhaps, or permissions - * - * - * @param args - * these are the arguments that we eventually supply to - * HadoopLogsAnalyzer.main to test its functionality with regard to - * statistical output - * @param inputFname - * this is the file name or directory name of the test input - * directory relative to the test cases data directory. - * @param goldFilename - * this is the file name of the expected output relative to the test - * cases data directory. - * @param inputIsDirectory - * this states whether the input is an entire directory, or a single - * file. - * @throws IOException - */ - private void statisticalTest(String args[], String inputFname, - String goldFilename, boolean inputIsDirectory) throws Exception { - File tempDirectory = new File(System.getProperty("test.build.data", "/tmp")); - - String rootInputDir = System.getProperty("test.tools.input.dir", ""); - String rootTempDir = System.getProperty("test.build.data", ""); - - File rootInputDirFile = new File(new File(rootInputDir), inputFname); - File tempDirFile = new File(rootTempDir); - - assertFalse("property test.build.data is not defined", "" - .equals(rootTempDir)); - assertFalse("property test.tools.input.dir is not defined", "" - .equals(rootInputDir)); - - if (rootInputDir.charAt(rootInputDir.length() - 1) == '/') { - rootInputDir = rootInputDir.substring(0, rootInputDir.length() - 1); - } - - if (rootTempDir.charAt(rootTempDir.length() - 1) == '/') { - rootTempDir = rootTempDir.substring(0, rootTempDir.length() - 1); - } - - File jobDistroGold = new File(new File(rootInputDir), goldFilename); - - String[] newArgs = new String[args.length + 1]; - - System.arraycopy(args, 0, newArgs, 0, args.length); - - newArgs[args.length + 1 - 1] = rootInputDirFile.getPath(); - - String complaint = inputIsDirectory ? " is not a directory." - : " does not exist."; - - boolean okay = inputIsDirectory ? rootInputDirFile.isDirectory() - : rootInputDirFile.canRead(); - - assertTrue("The input file " + rootInputDirFile.getPath() + complaint, okay); - - PrintStream old_stdout = System.out; - - File stdoutFile = File.createTempFile("stdout", "text", tempDirFile); - - // stdoutFile.deleteOnExit(); - - PrintStream enveloped_stdout = new PrintStream(new BufferedOutputStream( - new FileOutputStream(stdoutFile))); - - try { - System.setOut(enveloped_stdout); - - HadoopLogsAnalyzer analyzer = new HadoopLogsAnalyzer(); - - int result = ToolRunner.run(analyzer, args); - - enveloped_stdout.close(); - - System.setOut(old_stdout); - - assertFilesMatch(stdoutFile, jobDistroGold); - assertEquals("Non-zero exit", 0, result); - } finally { - System.setOut(old_stdout); - } - } - - static private Object readMapper(ObjectMapper mapper, JsonParser parser, - Object obj) throws IOException { - try { - return mapper.readValue(parser, obj.getClass()); - } catch (EOFException e) { - return null; - } - } - - static private void assertFilesMatch(File result, File gold) - throws IOException { - System.out.println("Comparing files: " + result.getPath() + " vrs. " - + gold.getPath()); - - int currentLineNumber = 1; - FileInputStream goldStream = new FileInputStream(gold); - BufferedReader goldReader = new BufferedReader(new InputStreamReader( - goldStream)); - String currentGoldLine = goldReader.readLine(); - - FileInputStream resultStream = new FileInputStream(result); - BufferedReader resultReader = new BufferedReader(new InputStreamReader( - resultStream)); - String currentResultLine = resultReader.readLine(); - - while (currentGoldLine != null && currentResultLine != null - && currentGoldLine.equals(currentResultLine)) { - ++currentLineNumber; - - currentGoldLine = goldReader.readLine(); - currentResultLine = resultReader.readLine(); - } - - if (currentGoldLine == null && currentResultLine == null) { - return; - } - - assertFalse("Line number " + currentLineNumber + " disagrees", true); - } - - static private void jsonFileMatchesGold(FileSystem lfs, Path result, - Path gold, Object obj, String fileDescription) throws IOException { + static private void jsonFileMatchesGold( + FileSystem lfs, Path result, Path gold, Class clazz, + String fileDescription) throws IOException { InputStream goldStream = lfs.open(gold); - BufferedReader goldReader = new BufferedReader(new InputStreamReader( - goldStream)); - + JsonObjectMapperParser goldParser = new JsonObjectMapperParser( + goldStream, clazz); InputStream resultStream = lfs.open(result); - BufferedReader resultReader = new BufferedReader(new InputStreamReader( - resultStream)); - - ObjectMapper goldMapper = new ObjectMapper(); - ObjectMapper resultMapper = new ObjectMapper(); - goldMapper.configure( - DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); - resultMapper.configure( - DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); - - JsonParser goldParser = goldMapper.getJsonFactory().createJsonParser( - goldReader); - JsonParser resultParser = resultMapper.getJsonFactory().createJsonParser( - resultReader); - - DeepCompare goldJob = (DeepCompare) readMapper(goldMapper, goldParser, obj); - DeepCompare resultJob = (DeepCompare) readMapper(resultMapper, - resultParser, obj); - - while (goldJob != null && resultJob != null) { - try { - resultJob.deepCompare(goldJob, new TreePath(null, "")); - } catch (DeepInequalityException e) { - String error = e.path.toString(); + JsonObjectMapperParser resultParser = new JsonObjectMapperParser( + resultStream, clazz); + try { + while (true) { + DeepCompare goldJob = goldParser.getNext(); + DeepCompare resultJob = resultParser.getNext(); + if ((goldJob == null) || (resultJob == null)) { + assertTrue(goldJob == resultJob); + break; + } + + try { + resultJob.deepCompare(goldJob, new TreePath(null, "")); + } catch (DeepInequalityException e) { + String error = e.path.toString(); - assertFalse(fileDescription + " mismatches: " + error, true); + assertFalse(fileDescription + " mismatches: " + error, true); + } } - - goldJob = (DeepCompare) readMapper(goldMapper, goldParser, obj); - resultJob = (DeepCompare) readMapper(resultMapper, resultParser, obj); - } - - if (goldJob != null) { - assertFalse( - "The Gold File has more logged jobs than the result of the run", true); - } - - if (resultJob != null) { - assertFalse("The result file has more logged jobs than the Gold File", - true); + } finally { + IOUtils.cleanup(null, goldParser, resultParser); } } } Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestZombieJob.java Sat Sep 12 09:31:34 2009 @@ -18,22 +18,20 @@ package org.apache.hadoop.tools.rumen; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.Vector; import java.util.List; import java.util.ArrayList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.TaskStatus.State; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskType; -import junit.framework.TestCase; - -public class TestZombieJob extends TestCase { +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; +public class TestZombieJob { final double epsilon = 0.01; private final int[] attemptTimesPercentiles = new int[] { 10, 50, 90 }; private long[] succeededCDF = new long[] { 5268, 5268, 5268, 5268, 5268 }; @@ -41,26 +39,26 @@ private double[] expectedPs = new double[] { 0.000001, 0.18707660239708182, 0.0013027618551328818, 2.605523710265763E-4 }; + private final long[] mapTaskCounts = new long[] { 7838525L, 342277L, 100228L, + 1564L, 1234L }; + private final long[] reduceTaskCounts = new long[] { 4405338L, 139391L, + 1514383L, 139391, 1234L }; + List loggedJobs = new ArrayList(); List jobStories = new ArrayList(); - /* - * (non-Javadoc) - * - * @see junit.framework.TestCase#setUp() - */ - protected void setUp() throws Exception { - String rootTempDir = System.getProperty("test.build.data", "/tmp"); - - String rootInputDir = System.getProperty("test.tools.input.dir", ""); - - File rootInputFile = new File(new File(rootInputDir), "rumen/zombie"); - File tempDirFile = new File(rootTempDir); - - Parser parser = new Parser(new FileReader(new File(rootInputFile, - "input-trace.json"))); - - parser.readTopology(new File(rootInputFile, "input-topology.json")); + @Before + public void setUp() throws Exception { + final Configuration conf = new Configuration(); + final FileSystem lfs = FileSystem.getLocal(conf); + + final Path rootInputDir = new Path( + System.getProperty("test.tools.input.dir", "")).makeQualified(lfs); + final Path rootInputFile = new Path(rootInputDir, "rumen/zombie"); + + ZombieJobProducer parser = new ZombieJobProducer(new Path(rootInputFile, + "input-trace.json"), new ZombieCluster(new Path(rootInputFile, + "input-topology.json"), null, conf), conf); JobStory job = null; for (int i = 0; i < 4; i++) { @@ -74,12 +72,6 @@ System.out.println("Input Splits -- " + job.getInputSplits().length + ", " + job.getNumberMaps()); - /* - * for (InputSplit split: job.getInputSplits()) { - * System.out.print(split.getLength() + ": "); for (String location: - * split.getLocations()) { System.out.print(location + ","); } - * System.out.println(); } - */ System.out.println("Successful Map CDF -------"); for (LoggedDiscreteCDF cdf : loggedJob.getSuccessfulMapAttemptCDFs()) { @@ -125,21 +117,10 @@ loggedJobs.add(loggedJob); jobStories.add(job); } - - super.setUp(); - } - - /* - * (non-Javadoc) - * - * @see junit.framework.TestCase#tearDown() - */ - protected void tearDown() throws Exception { - super.tearDown(); } - public void testFirstJob() throws FileNotFoundException, IOException, - InterruptedException { + @Test + public void testFirstJob() { // 20th job seems reasonable: "totalMaps":329,"totalReduces":101 // successful map: 80 node-local, 196 rack-local, 53 rack-remote, 2 unknown // failed map: 0-0-0-1 @@ -177,8 +158,8 @@ // TODO fill in test case } - public void testSecondJob() throws FileNotFoundException, IOException, - InterruptedException { + @Test + public void testSecondJob() { // 7th job has many failed tasks. // 3204 m, 0 r // successful maps 497-586-23-1, failed maps 0-0-0-2714 @@ -209,8 +190,8 @@ // available data set. } - public void testFourthJob() throws FileNotFoundException, IOException, - InterruptedException { + @Test + public void testFourthJob() { // 7th job has many failed tasks. // 3204 m, 0 r // successful maps 497-586-23-1, failed maps 0-0-0-2714 @@ -241,8 +222,29 @@ assertEquals(State.FAILED, taInfo.getRunState()); } - public void testMakeUpInfo() throws FileNotFoundException, IOException, - InterruptedException { + @Test + public void testRecordIOInfo() { + JobStory job = jobStories.get(3); + + TaskInfo mapTask = job.getTaskInfo(TaskType.MAP, 113); + + TaskInfo reduceTask = job.getTaskInfo(TaskType.REDUCE, 0); + + assertEquals(mapTaskCounts[0], mapTask.getInputBytes()); + assertEquals(mapTaskCounts[1], mapTask.getInputRecords()); + assertEquals(mapTaskCounts[2], mapTask.getOutputBytes()); + assertEquals(mapTaskCounts[3], mapTask.getOutputRecords()); + assertEquals(mapTaskCounts[4], mapTask.getTaskMemory()); + + assertEquals(reduceTaskCounts[0], reduceTask.getInputBytes()); + assertEquals(reduceTaskCounts[1], reduceTask.getInputRecords()); + assertEquals(reduceTaskCounts[2], reduceTask.getOutputBytes()); + assertEquals(reduceTaskCounts[3], reduceTask.getOutputRecords()); + assertEquals(reduceTaskCounts[4], reduceTask.getTaskMemory()); + } + + @Test + public void testMakeUpInfo() { // get many non-exist tasks // total 3204 map tasks, 3300 is a non-exist task. checkMakeUpTask(jobStories.get(3), 113, 1); @@ -253,7 +255,7 @@ Histogram sampleSucceeded = new Histogram(); Histogram sampleFailed = new Histogram(); - Vector sampleAttempts = new Vector(); + List sampleAttempts = new ArrayList(); for (int i = 0; i < 100000; i++) { int attemptId = 0; while (true) { Modified: hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json (original) +++ hadoop/mapreduce/trunk/src/test/tools/data/rumen/zombie/input-trace.json Sat Sep 12 09:31:34 2009 @@ -10951,7 +10951,7 @@ "computonsPerReduceOutputByte" : -1, "submitTime" : 1240336853354, "launchTime" : 1240336854289, - "heapMegabytes" : -1, + "heapMegabytes" : 1234, "totalMaps" : 131, "totalReduces" : 47, "outcome" : "SUCCESS", Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/AbstractClusterStory.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * {@link AbstractClusterStory} provides a partial implementation of + * {@link ClusterStory} by parsing the topology tree. + */ +public abstract class AbstractClusterStory implements ClusterStory { + protected Set machineNodes; + protected Set rackNodes; + protected MachineNode[] mNodesFlattened; + protected Map mNodeMap; + protected Map rNodeMap; + protected int maximumDistance = 0; + protected Random random; + + @Override + public Set getMachines() { + parseTopologyTree(); + return machineNodes; + } + + @Override + public synchronized Set getRacks() { + parseTopologyTree(); + return rackNodes; + } + + @Override + public synchronized MachineNode[] getRandomMachines(int expected) { + if (expected == 0) { + return new MachineNode[0]; + } + + parseTopologyTree(); + int total = machineNodes.size(); + int select = Math.min(expected, total); + + if (mNodesFlattened == null) { + mNodesFlattened = machineNodes.toArray(new MachineNode[total]); + random = new Random(); + } + + MachineNode[] retval = new MachineNode[select]; + int i = 0; + while ((i != select) && (total != i + select)) { + int index = random.nextInt(total - i); + MachineNode tmp = mNodesFlattened[index]; + mNodesFlattened[index] = mNodesFlattened[total - i - 1]; + mNodesFlattened[total - i - 1] = tmp; + ++i; + } + if (i == select) { + System.arraycopy(mNodesFlattened, total - i, retval, 0, select); + } else { + System.arraycopy(mNodesFlattened, 0, retval, 0, select); + } + + return retval; + } + + protected synchronized void buildMachineNodeMap() { + if (mNodeMap == null) { + mNodeMap = new HashMap(machineNodes.size()); + for (MachineNode mn : machineNodes) { + mNodeMap.put(mn.getName(), mn); + } + } + } + + @Override + public MachineNode getMachineByName(String name) { + buildMachineNodeMap(); + return mNodeMap.get(name); + } + + @Override + public int distance(Node a, Node b) { + int lvl_a = a.getLevel(); + int lvl_b = b.getLevel(); + int retval = 0; + if (lvl_a > lvl_b) { + retval = lvl_a-lvl_b; + for (int i=0; i(rackNodes.size()); + for (RackNode rn : rackNodes) { + rNodeMap.put(rn.getName(), rn); + } + } + } + + @Override + public RackNode getRackByName(String name) { + buildRackNodeMap(); + return rNodeMap.get(name); + } + + @Override + public int getMaximumDistance() { + parseTopologyTree(); + return maximumDistance; + } + + protected synchronized void parseTopologyTree() { + if (machineNodes == null) { + Node root = getClusterTopology(); + SortedSet mNodes = new TreeSet(); + SortedSet rNodes = new TreeSet(); + // dfs search of the tree. + Deque unvisited = new ArrayDeque(); + Deque distUnvisited = new ArrayDeque(); + unvisited.add(root); + distUnvisited.add(0); + for (Node n = unvisited.poll(); n != null; n = unvisited.poll()) { + int distance = distUnvisited.poll(); + if (n instanceof RackNode) { + rNodes.add((RackNode) n); + mNodes.addAll(((RackNode) n).getMachinesInRack()); + if (distance + 1 > maximumDistance) { + maximumDistance = distance + 1; + } + } else if (n instanceof MachineNode) { + mNodes.add((MachineNode) n); + if (distance > maximumDistance) { + maximumDistance = distance; + } + } else { + for (Node child : n.getChildren()) { + unvisited.addFirst(child); + distUnvisited.addFirst(distance+1); + } + } + } + + machineNodes = Collections.unmodifiableSortedSet(mNodes); + rackNodes = Collections.unmodifiableSortedSet(rNodes); + } + } +} Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/CDFPiecewiseLinearRandomGenerator.java Sat Sep 12 09:31:34 2009 @@ -17,10 +17,6 @@ */ package org.apache.hadoop.tools.rumen; -/** - * - * - */ public class CDFPiecewiseLinearRandomGenerator extends CDFRandomGenerator { /** @@ -43,7 +39,7 @@ super(cdf, seed); } - /* + /** * TODO This code assumes that the empirical minimum resp. maximum is the * epistomological minimum resp. maximum. This is probably okay for the * minimum, because that likely represents a task where everything went well, Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterStory.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.util.Set; + +/** + * {@link ClusterStory} represents all configurations of a MapReduce cluster, + * including nodes, network topology, and slot configurations. + */ +public interface ClusterStory { + /** + * Get all machines of the cluster. + * @return A read-only set that contains all machines of the cluster. + */ + public Set getMachines(); + + /** + * Get all racks of the cluster. + * @return A read-only set that contains all racks of the cluster. + */ + public Set getRacks(); + + /** + * Get the cluster topology tree. + * @return The root node of the cluster topology tree. + */ + public Node getClusterTopology(); + + /** + * Select a random set of machines. + * @param expected The expected sample size. + * @return An array of up to expected number of {@link MachineNode}s. + */ + public MachineNode[] getRandomMachines(int expected); + + /** + * Get {@link MachineNode} by its host name. + * + * @return The {@line MachineNode} with the same name. Or null if not found. + */ + public MachineNode getMachineByName(String name); + + /** + * Get {@link RackNode} by its name. + * @return The {@line RackNode} with the same name. Or null if not found. + */ + public RackNode getRackByName(String name); + + /** + * Determine the distance between two {@link Node}s. Currently, the distance + * is loosely defined as the length of the longer path for either a or b to + * reach their common ancestor. + * + * @param a + * @param b + * @return The distance between {@link Node} a and {@link Node} b. + */ + int distance(Node a, Node b); + + /** + * Get the maximum distance possible between any two nodes. + * @return the maximum distance possible between any two nodes. + */ + int getMaximumDistance(); +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ClusterTopologyReader.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Reading JSON-encoded cluster topology and produce the parsed + * {@link LoggedNetworkTopology} object. + */ +public class ClusterTopologyReader { + private LoggedNetworkTopology topology; + + private void readTopology(JsonObjectMapperParser parser) + throws IOException { + try { + topology = parser.getNext(); + if (topology == null) { + throw new IOException( + "Input file does not contain valid topology data."); + } + } finally { + parser.close(); + } + } + + /** + * Constructor. + * + * @param path + * Path to the JSON-encoded topology file, possibly compressed. + * @param conf + * @throws IOException + */ + public ClusterTopologyReader(Path path, Configuration conf) + throws IOException { + JsonObjectMapperParser parser = new JsonObjectMapperParser( + path, LoggedNetworkTopology.class, conf); + readTopology(parser); + } + + /** + * Constructor. + * + * @param input + * The input stream for the JSON-encoded topology data. + */ + public ClusterTopologyReader(InputStream input) throws IOException { + JsonObjectMapperParser parser = new JsonObjectMapperParser( + input, LoggedNetworkTopology.class); + readTopology(parser); + } + + /** + * Get the {@link LoggedNetworkTopology} object. + * + * @return The {@link LoggedNetworkTopology} object parsed from the input. + */ + public LoggedNetworkTopology get() { + return topology; + } +} Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Sat Sep 12 09:31:34 2009 @@ -66,7 +66,7 @@ import org.codehaus.jackson.map.SerializationConfig; /** - * This is the mainclass for rumen log mining functionality. + * This is the main class for rumen log mining functionality. * * It reads a directory of job tracker logs, and computes various information * about it. See {@code usage()}, below. @@ -85,11 +85,12 @@ */ private final static int MAXIMUM_PREFERRED_LOCATIONS = 25; - // This element is to compensate for the fact that our percentiles - // engine rounds up for the expected sample count, so if the total - // number of readings is small enough we need to compensate slightly - // when aggregating the spread data from jobs with few reducers together - // with jobs with many reducers. + /** + * This element is to compensate for the fact that our percentiles engine + * rounds up for the expected sample count, so if the total number of readings + * is small enough we need to compensate slightly when aggregating the spread + * data from jobs with few reducers together with jobs with many reducers. + */ private static final long SMALL_SPREAD_COMPENSATION_THRESHOLD = 5L; /** @@ -317,7 +318,7 @@ Path jobTraceFilename = null; Path topologyFilename = null; if (args.length == 0 || args[args.length - 1].charAt(0) == '-') { - inputFilename = null; + throw new IllegalArgumentException("No input specified."); } else { inputFilename = args[args.length - 1]; } @@ -547,6 +548,7 @@ IOException { if (input != null) { input.close(); + LOG.info("File closed: "+currentFileName); input = null; } @@ -580,6 +582,9 @@ private String readInputLine() throws IOException { try { + if (input == null) { + return null; + } inputLineText.clear(); if (input.readLine(inputLineText) == 0) { return null; @@ -1002,7 +1007,7 @@ ParsedHost node = getAndRecordParsedHost(nextSplit); - if (locations != null) { + if (locations != null && node != null) { locations.add(node.makeLoggedLocation()); } } @@ -1104,82 +1109,95 @@ private void incorporateCounters(LoggedTaskAttempt attempt2, String counterString) { incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.hdfsBytesRead = val; } }, counterString, "HDFS_BYTES_READ"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.hdfsBytesWritten = val; } }, counterString, "HDFS_BYTES_WRITTEN"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.fileBytesRead = val; } }, counterString, "FILE_BYTES_READ"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.fileBytesWritten = val; } }, counterString, "FILE_BYTES_WRITTEN"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.mapInputBytes = val; } }, counterString, "MAP_INPUT_BYTES"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.mapInputRecords = val; } }, counterString, "MAP_INPUT_RECORDS"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.mapOutputBytes = val; } }, counterString, "MAP_OUTPUT_BYTES"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.mapOutputRecords = val; } }, counterString, "MAP_OUTPUT_RECORDS"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.combineInputRecords = val; } }, counterString, "COMBINE_INPUT_RECORDS"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.reduceInputGroups = val; } }, counterString, "REDUCE_INPUT_GROUPS"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.reduceInputRecords = val; } }, counterString, "REDUCE_INPUT_RECORDS"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.reduceShuffleBytes = val; } }, counterString, "REDUCE_SHUFFLE_BYTES"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.reduceOutputRecords = val; } }, counterString, "REDUCE_OUTPUT_RECORDS"); incorporateCounter(new SetField(attempt2) { + @Override void set(long val) { attempt.spilledRecords = val; } }, counterString, "SPILLED_RECORDS"); } - private ParsedHost getAndRecordParsedHost(String hostName) - throws IllegalArgumentException { - ParsedHost result = new ParsedHost(hostName); + private ParsedHost getAndRecordParsedHost(String hostName) { + ParsedHost result = ParsedHost.parse(hostName); - if (!allHosts.contains(result)) { + if (result != null && !allHosts.contains(result)) { allHosts.add(result); } @@ -1259,28 +1277,23 @@ ParsedHost host = null; - try { - host = getAndRecordParsedHost(hostName); - } catch (IllegalArgumentException e) { - } + host = getAndRecordParsedHost(hostName); if (host != null) { attempt.setLocation(host.makeLoggedLocation()); } - if (task != null) { - ArrayList locs = task.getPreferredLocations(); + ArrayList locs = task.getPreferredLocations(); - if (host != null && locs != null) { - for (LoggedLocation loc : locs) { - ParsedHost preferedLoc = new ParsedHost(loc); + if (host != null && locs != null) { + for (LoggedLocation loc : locs) { + ParsedHost preferedLoc = new ParsedHost(loc); - distance = Math.min(distance, preferedLoc.distance(host)); - } + distance = Math.min(distance, preferedLoc.distance(host)); } - - mapperLocality.enter(distance); } + + mapperLocality.enter(distance); } distance = Math.min(distance, successfulMapAttemptTimes.length - 1); Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Histogram.java Sat Sep 12 09:31:34 2009 @@ -127,12 +127,14 @@ long cumulativeCount = 0; int bucketCursor = 0; + // Loop invariant: the item at buckets[bucketCursor] can still be reached - // from iter, and the number of logged elements no longer - // available from iter is cumulativeCount . - // + // from iter, and the number of logged elements no longer available from + // iter is cumulativeCount. + // // cumulativeCount/totalCount is therefore strictly less than // buckets[bucketCursor]/scale . + while (iter.hasNext()) { long targetCumulativeCount = buckets[bucketCursor] * totalCount / scale; Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStory.java Sat Sep 12 09:31:34 2009 @@ -18,6 +18,7 @@ package org.apache.hadoop.tools.rumen; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobHistory; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskType; @@ -99,7 +100,6 @@ /** * Get {@link TaskAttemptInfo} for a given task-attempt, considering impact * of locality. - * @param taskType {@link TaskType} of the task-attempt * @param taskNumber Partition number of the task-attempt * @param taskAttemptNumber Attempt number of the task * @param locality Data locality of the task as scheduled in simulation @@ -109,4 +109,10 @@ getMapTaskAttemptInfoAdjusted(int taskNumber, int taskAttemptNumber, int locality); + + /** + * Get the outcome of the job execution. + * @return The outcome of the job execution. + */ + public JobHistory.Values getOutcome(); } Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobStoryProducer.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.Closeable; +import java.io.IOException; + +/** + * {@link JobStoryProducer} produces the sequence of {@link JobStory}'s. + */ +public interface JobStoryProducer extends Closeable { + /** + * Get the next job. + * @return The next job. Or null if no more job is available. + * @throws IOException + */ + JobStory getNextJob() throws IOException; +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobTraceReader.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Reading JSON-encoded job traces and produce {@link LoggedJob} instances. + */ +public class JobTraceReader extends JsonObjectMapperParser { + /** + * Constructor. + * + * @param path + * Path to the JSON trace file, possibly compressed. + * @param conf + * @throws IOException + */ + public JobTraceReader(Path path, Configuration conf) throws IOException { + super(path, LoggedJob.class, conf); + } + + /** + * Constructor. + * + * @param input + * The input stream for the JSON trace. + */ + public JobTraceReader(InputStream input) throws IOException { + super(input, LoggedJob.class); + } +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * A simple wrapper for parsing JSON-encoded data using ObjectMapper. + * @param The (base) type of the object(s) to be parsed by this parser. + */ +class JsonObjectMapperParser implements Closeable { + private final ObjectMapper mapper; + private final Class clazz; + private final JsonParser jsonParser; + private final Decompressor decompressor; + + /** + * Constructor. + * + * @param path + * Path to the JSON data file, possibly compressed. + * @param conf + * @throws IOException + */ + public JsonObjectMapperParser(Path path, Class clazz, + Configuration conf) throws IOException { + mapper = new ObjectMapper(); + mapper.configure( + DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); + this.clazz = clazz; + FileSystem fs = path.getFileSystem(conf); + CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path); + InputStream input; + if (codec == null) { + input = fs.open(path); + decompressor = null; + } else { + FSDataInputStream fsdis = fs.open(path); + decompressor = CodecPool.getDecompressor(codec); + input = codec.createInputStream(fsdis, decompressor); + } + jsonParser = mapper.getJsonFactory().createJsonParser(input); + } + + /** + * Constructor. + * + * @param input + * The input stream for the JSON data. + */ + public JsonObjectMapperParser(InputStream input, Class clazz) + throws IOException { + mapper = new ObjectMapper(); + mapper.configure( + DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); + this.clazz = clazz; + decompressor = null; + jsonParser = mapper.getJsonFactory().createJsonParser(input); + } + + /** + * Get the next object from the trace. + * + * @return The next instance of the object. Or null if we reach the end of + * stream. + * @throws IOException + */ + public T getNext() throws IOException { + try { + return mapper.readValue(jsonParser, clazz); + } catch (EOFException e) { + return null; + } + } + + @Override + public void close() throws IOException { + try { + jsonParser.close(); + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + } +} Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedDiscreteCDF.java Sat Sep 12 09:31:34 2009 @@ -50,20 +50,8 @@ void setCDF(Histogram data, int[] steps, int modulus) { numberValues = data.getTotalCount(); - - // HadoopLogsAnalyzer.printStackTrace(); - // data.dump(System.out); - long[] CDF = data.getCDF(modulus, steps); - /* - * if (CDF == null) { System.out.print("(null result)\n"); } else { for - * (long step : CDF) { System.out.print("One result row: " + step + "\n"); } - * } - * - * System.out.print("\n"); - */ - if (CDF != null) { minimum = CDF[0]; maximum = CDF[CDF.length - 1]; Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Sat Sep 12 09:31:34 2009 @@ -102,7 +102,7 @@ setJobID(jobID); } - // TODO consider having default readers on the other objects + @SuppressWarnings("unused") // for input parameter ignored. @JsonAnySetter public void setUnknownAttribute(String attributeName, Object ignored) { if (!alreadySeenAnySetterAttributes.contains(attributeName)) { Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Sat Sep 12 09:31:34 2009 @@ -37,9 +37,11 @@ * */ public class LoggedLocation implements DeepCompare { - // The full path from the root of the network to the host. - // - // NOTE that this assumes that the network topology is a tree. + /** + * The full path from the root of the network to the host. + * + * NOTE that this assumes that the network topology is a tree. + */ List layers = new ArrayList(); public List getLayers() { Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Sat Sep 12 09:31:34 2009 @@ -50,7 +50,7 @@ * order. * */ - static private class TopoSort implements Comparator { + static class TopoSort implements Comparator { public int compare(LoggedNetworkTopology t1, LoggedNetworkTopology t2) { return t1.name.compareTo(t2.name); } Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Sat Sep 12 09:31:34 2009 @@ -245,11 +245,4 @@ compareLoggedLocations(preferredLocations, other.preferredLocations, loc, "preferredLocations"); } - /* - * ArrayList attempts = new ArrayList(); - * - * ArrayList preferredLocations; - * - * int numberMaps = -1; int numberReduces = -1; - */ } Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MachineNode.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +/** + * {@link MachineNode} represents the configuration of a cluster node. + * {@link MachineNode} should be constructed by {@link MachineNode.Builder}. + */ +public final class MachineNode extends Node { + long memory = -1; // in KB + int mapSlots = 1; + int reduceSlots = 1; + long memoryPerMapSlot = -1; // in KB + long memoryPerReduceSlot = -1; // in KB + int numCores = 1; + + MachineNode(String name, int level) { + super(name, level); + } + + @Override + public boolean equals(Object obj) { + // name/level sufficient + return super.equals(obj); + } + + @Override + public int hashCode() { + // match equals + return super.hashCode(); + } + + /** + * Get the available physical RAM of the node. + * @return The available physical RAM of the node, in KB. + */ + public long getMemory() { + return memory; + } + + /** + * Get the number of map slots of the node. + * @return The number of map slots of the node. + */ + public int getMapSlots() { + return mapSlots; + } + + /** + * Get the number of reduce slots of the node. + * @return The number of reduce slots fo the node. + */ + public int getReduceSlots() { + return reduceSlots; + } + + /** + * Get the amount of RAM reserved for each map slot. + * @return the amount of RAM reserved for each map slot, in KB. + */ + public long getMemoryPerMapSlot() { + return memoryPerMapSlot; + } + + /** + * Get the amount of RAM reserved for each reduce slot. + * @return the amount of RAM reserved for each reduce slot, in KB. + */ + public long getMemoryPerReduceSlot() { + return memoryPerReduceSlot; + } + + /** + * Get the number of cores of the node. + * @return the number of cores of the node. + */ + public int getNumCores() { + return numCores; + } + + /** + * Get the rack node that the machine belongs to. + * + * @return The rack node that the machine belongs to. Returns null if the + * machine does not belong to any rack. + */ + public RackNode getRackNode() { + return (RackNode)getParent(); + } + + @Override + public synchronized boolean addChild(Node child) { + throw new IllegalStateException("Cannot add child to MachineNode"); + } + + /** + * Builder for a NodeInfo object + */ + public static final class Builder { + private MachineNode node; + + /** + * Start building a new NodeInfo object. + * @param name + * Unique name of the node. Typically the fully qualified domain + * name. + */ + public Builder(String name, int level) { + node = new MachineNode(name, level); + } + + /** + * Set the physical memory of the node. + * @param memory Available RAM in KB. + */ + public Builder setMemory(long memory) { + node.memory = memory; + return this; + } + + /** + * Set the number of map slot for the node. + * @param mapSlots The number of map slots for the node. + */ + public Builder setMapSlots(int mapSlots) { + node.mapSlots = mapSlots; + return this; + } + + /** + * Set the number of reduce slot for the node. + * @param reduceSlots The number of reduce slots for the node. + */ + public Builder setReduceSlots(int reduceSlots) { + node.reduceSlots = reduceSlots; + return this; + } + + /** + * Set the amount of RAM reserved for each map slot. + * @param memoryPerMapSlot The amount of RAM reserved for each map slot, in KB. + */ + public Builder setMemoryPerMapSlot(long memoryPerMapSlot) { + node.memoryPerMapSlot = memoryPerMapSlot; + return this; + } + + /** + * Set the amount of RAM reserved for each reduce slot. + * @param memoryPerReduceSlot The amount of RAM reserved for each reduce slot, in KB. + */ + public Builder setMemoryPerReduceSlot(long memoryPerReduceSlot) { + node.memoryPerReduceSlot = memoryPerReduceSlot; + return this; + } + + /** + * Set the number of cores for the node. + * @param numCores Number of cores for the node. + */ + public Builder setNumCores(int numCores) { + node.numCores = numCores; + return this; + } + + /** + * Clone the settings from a reference {@link MachineNode} object. + * @param ref The reference {@link MachineNode} object. + */ + public Builder cloneFrom(MachineNode ref) { + node.memory = ref.memory; + node.mapSlots = ref.mapSlots; + node.reduceSlots = ref.reduceSlots; + node.memoryPerMapSlot = ref.memoryPerMapSlot; + node.memoryPerReduceSlot = ref.memoryPerReduceSlot; + node.numCores = ref.numCores; + return this; + } + + /** + * Build the {@link MachineNode} object. + * @return The {@link MachineNode} object being built. + */ + public MachineNode build() { + MachineNode retVal = node; + node = null; + return retVal; + } + } +} Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapTaskAttemptInfo.java Sat Sep 12 09:31:34 2009 @@ -19,8 +19,11 @@ import org.apache.hadoop.mapred.TaskStatus.State; +/** + * {@link MapTaskAttemptInfo} represents the information with regard to a + * map task attempt. + */ public class MapTaskAttemptInfo extends TaskAttemptInfo { - private long runtime; public MapTaskAttemptInfo(State state, TaskInfo taskInfo, long runtime) { Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java?rev=814122&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Node.java Sat Sep 12 09:31:34 2009 @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.util.Collections; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * {@link Node} represents a node in the cluster topology. A node can be a + * {@MachineNode}, or a {@link RackNode}, etc. + */ +public class Node implements Comparable { + private static final SortedSet EMPTY_SET = + Collections.unmodifiableSortedSet(new TreeSet()); + private Node parent; + private final String name; + private final int level; + private SortedSet children; + + /** + * @param name + * A unique name to identify a node in the cluster. + * @param level + * The level of the node in the cluster + */ + public Node(String name, int level) { + if (name == null) { + throw new IllegalArgumentException("Node name cannot be null"); + } + + if (level < 0) { + throw new IllegalArgumentException("Level cannot be negative"); + } + + this.name = name; + this.level = level; + } + + /** + * Get the name of the node. + * + * @return The name of the node. + */ + public String getName() { + return name; + } + + /** + * Get the level of the node. + * @return The level of the node. + */ + public int getLevel() { + return level; + } + + private void checkChildren() { + if (children == null) { + children = new TreeSet(); + } + } + + /** + * Add a child node to this node. + * @param child The child node to be added. The child node should currently not be belong to another cluster topology. + * @return Boolean indicating whether the node is successfully added. + */ + public synchronized boolean addChild(Node child) { + if (child.parent != null) { + throw new IllegalArgumentException( + "The child is already under another node:" + child.parent); + } + checkChildren(); + boolean retval = children.add(child); + if (retval) child.parent = this; + return retval; + } + + /** + * Does this node have any children? + * @return Boolean indicate whether this node has any children. + */ + public synchronized boolean hasChildren() { + return children != null && !children.isEmpty(); + } + + /** + * Get the children of this node. + * + * @return The children of this node. If no child, an empty set will be + * returned. The returned set is read-only. + */ + public synchronized Set getChildren() { + return (children == null) ? EMPTY_SET : + Collections.unmodifiableSortedSet(children); + } + + /** + * Get the parent node. + * @return the parent node. If root node, return null. + */ + public Node getParent() { + return parent; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (obj.getClass() != this.getClass()) + return false; + Node other = (Node) obj; + return name.equals(other.name); + } + + @Override + public String toString() { + return "(" + name +", " + level +")"; + } + + @Override + public int compareTo(Node o) { + return name.compareTo(o.name); + } +} Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Sat Sep 12 09:31:34 2009 @@ -20,7 +20,6 @@ import java.util.regex.Pattern; import java.util.regex.Matcher; -import java.io.StringReader; import java.io.InputStream; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -38,9 +37,8 @@ import org.xml.sax.SAXException; class ParsedConfigFile { - static Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_"); - - static Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])"); + private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_"); + private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])"); final int heapMegabytes; @@ -69,6 +67,7 @@ return oldValue; } + @SuppressWarnings("hiding") ParsedConfigFile(String filenameLine, String xmlString) { super(); @@ -121,6 +120,7 @@ NodeList fields = prop.getChildNodes(); String attr = null; String value = null; + @SuppressWarnings("unused") boolean finalParameter = false; for (int j = 0; j < fields.getLength(); j++) { Node fieldNode = fields.item(j); Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedHost.java Sat Sep 12 09:31:34 2009 @@ -23,27 +23,21 @@ import java.util.regex.Matcher; class ParsedHost { - String rackName; - String nodeName; - - private static Pattern splitPattern = Pattern - .compile("/([0-9\\\\\\.]+)/(.+)"); - - static int numberOfDistances() { - return 3; - } + private final String rackName; + private final String nodeName; /** - * @return the components, broadest first [ie., the last element is always the - * individual node name] + * TODO the following only works for /rack/host format. Change to support + * arbitrary level of network names. */ - String[] nameComponents() { - String[] result = new String[2]; + private static final Pattern splitPattern = Pattern + .compile("/([^/]+)/([^/]+)"); - result[0] = rackName; - result[1] = nodeName; - - return result; + /** + * TODO handle arbitrary level of network names. + */ + static int numberOfDistances() { + return 3; } String nameComponent(int i) throws IllegalArgumentException { @@ -65,17 +59,14 @@ return rackName.hashCode() * 17 + nodeName.hashCode(); } - ParsedHost(String name) throws IllegalArgumentException { + public static ParsedHost parse(String name) { // separate out the node name Matcher matcher = splitPattern.matcher(name); - if (!matcher.matches()) { - throw new IllegalArgumentException("Illegal node designator: \"" + name - + "\""); - } + if (!matcher.matches()) + return null; - rackName = matcher.group(1); - nodeName = matcher.group(2); + return new ParsedHost(matcher.group(1), matcher.group(2)); } public ParsedHost(LoggedLocation loc) { @@ -97,29 +88,28 @@ return result; } - - // expects the broadest name first - ParsedHost(String[] names) { - rackName = names[0]; - nodeName = names[1]; + + String getNodeName() { + return nodeName; + } + + String getRackName() { + return rackName; } - // returns the broadest name first - String[] nameList() { - String[] result = new String[2]; - - result[0] = rackName; - result[1] = nodeName; - - return result; + // expects the broadest name first + ParsedHost(String rackName, String nodeName) { + this.rackName = rackName; + this.nodeName = nodeName; } + @Override public boolean equals(Object other) { - if (other instanceof ParsedHost) { - return distance((ParsedHost) other) == 0; + if (!(other instanceof ParsedHost)) { + return false; } - - return false; + ParsedHost host = (ParsedHost) other; + return (nodeName.equals(host.nodeName) && rackName.equals(host.rackName)); } int distance(ParsedHost other) { Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=814122&r1=814121&r2=814122&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Sat Sep 12 09:31:34 2009 @@ -20,24 +20,14 @@ import java.util.Properties; import java.util.regex.Pattern; -/** - * - */ class ParsedLine { - Properties content; - LogRecordType type; - static Pattern keyValPair = Pattern + static final Pattern keyValPair = Pattern .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\""); - /** - * - */ - ParsedLine() { - } - + @SuppressWarnings("unused") ParsedLine(String fullLine, int version) { super();