Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8949A44CD for ; Thu, 2 Jun 2011 03:14:19 +0000 (UTC) Received: (qmail 18702 invoked by uid 500); 2 Jun 2011 03:14:19 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 18672 invoked by uid 500); 2 Jun 2011 03:14:18 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 18663 invoked by uid 99); 2 Jun 2011 03:14:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jun 2011 03:14:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jun 2011 03:14:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 4A4E32388906; Thu, 2 Jun 2011 03:13:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1130393 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapred/ Date: Thu, 02 Jun 2011 03:13:51 -0000 To: mapreduce-commits@hadoop.apache.org From: amarrk@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110602031351.4A4E32388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: amarrk Date: Thu Jun 2 03:13:50 2011 New Revision: 1130393 URL: http://svn.apache.org/viewvc?rev=1130393&view=rev Log: MAPREDUCE-2469. Task counters should also report the total heap usage of the task. (Ravi Gummadi and Amar Ramesh Kamat via amarrk) Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1130393&r1=1130392&r2=1130393&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jun 2 03:13:50 2011 @@ -23,6 +23,9 @@ Trunk (unreleased changes) IMPROVEMENTS + MAPREDUCE-2469. Task counters should also report the total heap usage of + the task. (Ravi Gummadi and Amar Ramesh Kamat via amarrk) + MAPREDUCE-2544. [Gridmix] Add compression emulation system tests to Gridmix. (Vinay Kumar Thota via amarrk) Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=1130393&r1=1130392&r2=1130393&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Jun 2 03:13:50 2011 @@ -64,7 +64,6 @@ import org.apache.hadoop.mapreduce.util. import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.*; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -731,6 +730,10 @@ abstract public class Task implements Wr * Update resource information counters */ void updateResourceCounters() { + // Update generic resource counters + updateHeapUsageCounter(); + + // Updating resources specified in ResourceCalculatorPlugin if (resourceCalculator == null) { return; } @@ -847,6 +850,17 @@ abstract public class Task implements Wr updateResourceCounters(); } + /** + * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the + * current total committed heap space usage of this JVM. + */ + @SuppressWarnings("deprecation") + private void updateHeapUsageCounter() { + long currentHeapUsage = Runtime.getRuntime().totalMemory(); + counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES) + .setValue(currentHeapUsage); + } + public void done(TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException { Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=1130393&r1=1130392&r2=1130393&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Thu Jun 2 03:13:50 2011 @@ -45,5 +45,6 @@ public enum TaskCounter { GC_TIME_MILLIS, CPU_MILLISECONDS, PHYSICAL_MEMORY_BYTES, - VIRTUAL_MEMORY_BYTES + VIRTUAL_MEMORY_BYTES, + COMMITTED_HEAP_BYTES } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=1130393&r1=1130392&r2=1130393&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Thu Jun 2 03:13:50 2011 @@ -31,3 +31,4 @@ SHUFFLED_MAPS.name= Shuffled FAILED_SHUFFLE.name= Failed Shuffles MERGED_MAP_OUTPUTS.name= Merged Map outputs GC_TIME_MILLIS.name= GC time elapsed (ms) +COMMITTED_HEAP_BYTES.name= Total committed heap usage (bytes) Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java?rev=1130393&r1=1130392&r2=1130393&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java Thu Jun 2 03:13:50 2011 @@ -19,7 +19,10 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.util.ArrayList; import java.util.Formatter; +import java.util.Iterator; +import java.util.List; import java.util.StringTokenizer; import org.junit.AfterClass; @@ -27,15 +30,21 @@ import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.*; +import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.TaskType; /** * This is an wordcount application that tests the count of records @@ -301,6 +310,250 @@ public class TestJobCounters { validateCounters(c1, 147456, 25600, 102400); } + /** + * Increases the JVM's heap usage to the specified target value. + */ + static class MemoryLoader { + private static final int DEFAULT_UNIT_LOAD_SIZE = 10 * 1024 * 1024; // 10mb + + // the target value to reach + private long targetValue; + // a list to hold the load objects + private List loadObjects = new ArrayList(); + + MemoryLoader(long targetValue) { + this.targetValue = targetValue; + } + + /** + * Loads the memory to the target value. + */ + void load() { + while (Runtime.getRuntime().totalMemory() < targetValue) { + System.out.println("Loading memory with " + DEFAULT_UNIT_LOAD_SIZE + + " characters. Current usage : " + + Runtime.getRuntime().totalMemory()); + // load some objects in the memory + loadObjects.add(RandomStringUtils.random(DEFAULT_UNIT_LOAD_SIZE)); + + // sleep for 100ms + try { + Thread.sleep(100); + } catch (InterruptedException ie) {} + } + } + } + + /** + * A mapper that increases the JVM's heap usage to a target value configured + * via {@link MemoryLoaderMapper#TARGET_VALUE} using a {@link MemoryLoader}. + */ + @SuppressWarnings({"deprecation", "unchecked"}) + static class MemoryLoaderMapper + extends MapReduceBase + implements Mapper { + static final String TARGET_VALUE = "map.memory-loader.target-value"; + + private static MemoryLoader loader = null; + + public void map(WritableComparable key, Writable val, + OutputCollector output, + Reporter reporter) + throws IOException { + assertNotNull("Mapper not configured!", loader); + + // load the memory + loader.load(); + + // work as identity mapper + output.collect(key, val); + } + + public void configure(JobConf conf) { + loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1)); + } + } + + /** + * A reducer that increases the JVM's heap usage to a target value configured + * via {@link MemoryLoaderReducer#TARGET_VALUE} using a {@link MemoryLoader}. + */ + @SuppressWarnings({"deprecation", "unchecked"}) + static class MemoryLoaderReducer extends MapReduceBase + implements Reducer { + static final String TARGET_VALUE = "reduce.memory-loader.target-value"; + private static MemoryLoader loader = null; + + public void reduce(WritableComparable key, Iterator val, + OutputCollector output, + Reporter reporter) + throws IOException { + assertNotNull("Reducer not configured!", loader); + + // load the memory + loader.load(); + + // work as identity reducer + output.collect(key, key); + } + + public void configure(JobConf conf) { + loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1)); + } + } + + @SuppressWarnings("deprecation") + private long getTaskCounterUsage (JobClient client, JobID id, int numReports, + int taskId, TaskType type) + throws Exception { + TaskReport[] reports = null; + if (TaskType.MAP.equals(type)) { + reports = client.getMapTaskReports(id); + } else if (TaskType.REDUCE.equals(type)) { + reports = client.getReduceTaskReports(id); + } + + assertNotNull("No reports found for task type '" + type.name() + + "' in job " + id, reports); + // make sure that the total number of reports match the expected + assertEquals("Mismatch in task id", numReports, reports.length); + + Counters counters = reports[taskId].getCounters(); + + return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES); + } + + // set up heap options, target value for memory loader and the output + // directory before running the job + @SuppressWarnings("deprecation") + private static RunningJob runHeapUsageTestJob(JobConf conf, Path testRootDir, + String heapOptions, long targetMapValue, + long targetReduceValue, FileSystem fs, + JobClient client, Path inDir) + throws IOException { + // define a job + JobConf jobConf = new JobConf(conf); + + // configure the jobs + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(1); + jobConf.setMapperClass(MemoryLoaderMapper.class); + jobConf.setReducerClass(MemoryLoaderReducer.class); + jobConf.setInputFormat(TextInputFormat.class); + jobConf.setOutputKeyClass(LongWritable.class); + jobConf.setOutputValueClass(Text.class); + jobConf.setMaxMapAttempts(1); + jobConf.setMaxReduceAttempts(1); + jobConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, heapOptions); + + // set the targets + jobConf.setLong(MemoryLoaderMapper.TARGET_VALUE, targetMapValue); + jobConf.setLong(MemoryLoaderReducer.TARGET_VALUE, targetReduceValue); + + // set the input directory for the job + FileInputFormat.setInputPaths(jobConf, inDir); + + // define job output folder + Path outDir = new Path(testRootDir, "out"); + fs.delete(outDir, true); + FileOutputFormat.setOutputPath(jobConf, outDir); + + // run the job + RunningJob job = client.submitJob(jobConf); + job.waitForCompletion(); + JobID jobID = job.getID(); + assertTrue("Job " + jobID + " failed!", job.isSuccessful()); + + return job; + } + + /** + * Tests {@link TaskCounter}'s {@link TaskCounter.COMMITTED_HEAP_BYTES}. + * The test consists of running a low-memory job which consumes less heap + * memory and then running a high-memory job which consumes more heap memory, + * and then ensuring that COMMITTED_HEAP_BYTES of low-memory job is smaller + * than that of the high-memory job. + * @throws IOException + */ + @Test + @SuppressWarnings("deprecation") + public void testHeapUsageCounter() throws Exception { + JobConf conf = new JobConf(); + // create a local filesystem handle + FileSystem fileSystem = FileSystem.getLocal(conf); + + // define test root directories + Path rootDir = + new Path(System.getProperty("test.build.data", "/tmp")); + Path testRootDir = new Path(rootDir, "testHeapUsageCounter"); + // cleanup the test root directory + fileSystem.delete(testRootDir, true); + // set the current working directory + fileSystem.setWorkingDirectory(testRootDir); + + fileSystem.deleteOnExit(testRootDir); + + // create a mini cluster using the local file system + MiniMRCluster mrCluster = + new MiniMRCluster(1, fileSystem.getUri().toString(), 1); + + try { + conf = mrCluster.createJobConf(); + JobClient jobClient = new JobClient(conf); + + // define job input + Path inDir = new Path(testRootDir, "in"); + // create input data + createWordsFile(inDir, conf); + + // configure and run a low memory job which will run without loading the + // jvm's heap + RunningJob lowMemJob = + runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G", + 0, 0, fileSystem, jobClient, inDir); + JobID lowMemJobID = lowMemJob.getID(); + long lowMemJobMapHeapUsage = getTaskCounterUsage(jobClient, lowMemJobID, + 1, 0, TaskType.MAP); + System.out.println("Job1 (low memory job) map task heap usage: " + + lowMemJobMapHeapUsage); + long lowMemJobReduceHeapUsage = + getTaskCounterUsage(jobClient, lowMemJobID, 1, 0, TaskType.REDUCE); + System.out.println("Job1 (low memory job) reduce task heap usage: " + + lowMemJobReduceHeapUsage); + + // configure and run a high memory job which will load the jvm's heap + RunningJob highMemJob = + runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G", + lowMemJobMapHeapUsage + 256*1024*1024, + lowMemJobReduceHeapUsage + 256*1024*1024, + fileSystem, jobClient, inDir); + JobID highMemJobID = highMemJob.getID(); + + long highMemJobMapHeapUsage = getTaskCounterUsage(jobClient, highMemJobID, + 1, 0, TaskType.MAP); + System.out.println("Job2 (high memory job) map task heap usage: " + + highMemJobMapHeapUsage); + long highMemJobReduceHeapUsage = + getTaskCounterUsage(jobClient, highMemJobID, 1, 0, TaskType.REDUCE); + System.out.println("Job2 (high memory job) reduce task heap usage: " + + highMemJobReduceHeapUsage); + + assertTrue("Incorrect map heap usage reported by the map task", + lowMemJobMapHeapUsage < highMemJobMapHeapUsage); + + assertTrue("Incorrect reduce heap usage reported by the reduce task", + lowMemJobReduceHeapUsage < highMemJobReduceHeapUsage); + } finally { + // shutdown the mr cluster + mrCluster.shutdown(); + try { + fileSystem.delete(testRootDir, true); + } catch (IOException ioe) {} + } + } + public static class NewMapTokenizer extends org.apache.hadoop.mapreduce.Mapper { private final static IntWritable one = new IntWritable(1);