hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ama...@apache.org
Subject svn commit: r1130393 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapred/
Date Thu, 02 Jun 2011 03:13:51 GMT
Author: amarrk
Date: Thu Jun  2 03:13:50 2011
New Revision: 1130393

URL: http://svn.apache.org/viewvc?rev=1130393&view=rev
Log:
MAPREDUCE-2469. Task counters should also report the total heap usage of the task. (Ravi Gummadi
and Amar Ramesh Kamat via amarrk)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1130393&r1=1130392&r2=1130393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jun  2 03:13:50 2011
@@ -23,6 +23,9 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    MAPREDUCE-2469. Task counters should also report the total heap usage of 
+    the task. (Ravi Gummadi and Amar Ramesh Kamat via amarrk)
+
     MAPREDUCE-2544. [Gridmix] Add compression emulation system tests to 
     Gridmix. (Vinay Kumar Thota via amarrk)
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=1130393&r1=1130392&r2=1130393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Jun  2 03:13:50
2011
@@ -64,7 +64,6 @@ import org.apache.hadoop.mapreduce.util.
 import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin.*;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -731,6 +730,10 @@ abstract public class Task implements Wr
    * Update resource information counters
    */
   void updateResourceCounters() {
+    // Update generic resource counters
+    updateHeapUsageCounter();
+
+    // Updating resources specified in ResourceCalculatorPlugin
     if (resourceCalculator == null) {
       return;
     }
@@ -847,6 +850,17 @@ abstract public class Task implements Wr
     updateResourceCounters();
   }
 
+  /**
+   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+   * current total committed heap space usage of this JVM.
+   */
+  @SuppressWarnings("deprecation")
+  private void updateHeapUsageCounter() {
+    long currentHeapUsage = Runtime.getRuntime().totalMemory();
+    counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
+            .setValue(currentHeapUsage);
+  }
+
   public void done(TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, InterruptedException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=1130393&r1=1130392&r2=1130393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Thu Jun 
2 03:13:50 2011
@@ -45,5 +45,6 @@ public enum TaskCounter {
   GC_TIME_MILLIS,
   CPU_MILLISECONDS,
   PHYSICAL_MEMORY_BYTES,
-  VIRTUAL_MEMORY_BYTES
+  VIRTUAL_MEMORY_BYTES,
+  COMMITTED_HEAP_BYTES
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=1130393&r1=1130392&r2=1130393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Thu
Jun  2 03:13:50 2011
@@ -31,3 +31,4 @@ SHUFFLED_MAPS.name=            Shuffled 
 FAILED_SHUFFLE.name=           Failed Shuffles
 MERGED_MAP_OUTPUTS.name=       Merged Map outputs
 GC_TIME_MILLIS.name=           GC time elapsed (ms)
+COMMITTED_HEAP_BYTES.name=     Total committed heap usage (bytes)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java?rev=1130393&r1=1130392&r2=1130393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobCounters.java Thu
Jun  2 03:13:50 2011
@@ -19,7 +19,10 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Formatter;
+import java.util.Iterator;
+import java.util.List;
 import java.util.StringTokenizer;
 
 import org.junit.AfterClass;
@@ -27,15 +30,21 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TaskType;
 
 /**
  * This is an wordcount application that tests the count of records
@@ -301,6 +310,250 @@ public class TestJobCounters {
     validateCounters(c1, 147456, 25600, 102400);
   }
 
+  /** 
+   * Increases the JVM's heap usage to the specified target value.
+   */
+  static class MemoryLoader {
+    private static final int DEFAULT_UNIT_LOAD_SIZE = 10 * 1024 * 1024; // 10mb
+    
+    // the target value to reach
+    private long targetValue;
+    // a list to hold the load objects
+    private List<String> loadObjects = new ArrayList<String>();
+    
+    MemoryLoader(long targetValue) {
+      this.targetValue = targetValue;
+    }
+    
+    /**
+     * Loads the memory to the target value.
+     */
+    void load() {
+      while (Runtime.getRuntime().totalMemory() < targetValue) {
+        System.out.println("Loading memory with " + DEFAULT_UNIT_LOAD_SIZE 
+                           + " characters. Current usage : " 
+                           + Runtime.getRuntime().totalMemory());
+        // load some objects in the memory
+        loadObjects.add(RandomStringUtils.random(DEFAULT_UNIT_LOAD_SIZE));
+
+        // sleep for 100ms
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {}
+      }
+    }
+  }
+  
+  /**
+   * A mapper that increases the JVM's heap usage to a target value configured 
+   * via {@link MemoryLoaderMapper#TARGET_VALUE} using a {@link MemoryLoader}.
+   */
+  @SuppressWarnings({"deprecation", "unchecked"})
+  static class MemoryLoaderMapper 
+  extends MapReduceBase 
+  implements Mapper<WritableComparable, Writable, 
+                    WritableComparable, Writable> {
+    static final String TARGET_VALUE = "map.memory-loader.target-value";
+    
+    private static MemoryLoader loader = null;
+    
+    public void map(WritableComparable key, Writable val, 
+                    OutputCollector<WritableComparable, Writable> output,
+                    Reporter reporter)
+    throws IOException {
+      assertNotNull("Mapper not configured!", loader);
+      
+      // load the memory
+      loader.load();
+      
+      // work as identity mapper
+      output.collect(key, val);
+    }
+
+    public void configure(JobConf conf) {
+      loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
+    }
+  }
+  
+  /** 
+   * A reducer that increases the JVM's heap usage to a target value configured 
+   * via {@link MemoryLoaderReducer#TARGET_VALUE} using a {@link MemoryLoader}.
+   */
+  @SuppressWarnings({"deprecation", "unchecked"})
+  static class MemoryLoaderReducer extends MapReduceBase 
+  implements Reducer<WritableComparable, Writable, 
+                     WritableComparable, Writable> {
+    static final String TARGET_VALUE = "reduce.memory-loader.target-value";
+    private static MemoryLoader loader = null;
+    
+    public void reduce(WritableComparable key, Iterator<Writable> val, 
+                       OutputCollector<WritableComparable, Writable> output,
+                       Reporter reporter)
+    throws IOException {
+      assertNotNull("Reducer not configured!", loader);
+      
+      // load the memory
+      loader.load();
+      
+      // work as identity reducer
+      output.collect(key, key);
+    }
+
+    public void configure(JobConf conf) {
+      loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
+    }
+  }
+  
+  @SuppressWarnings("deprecation")
+  private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
+                                    int taskId, TaskType type) 
+  throws Exception {
+    TaskReport[] reports = null;
+    if (TaskType.MAP.equals(type)) {
+      reports = client.getMapTaskReports(id);
+    } else if (TaskType.REDUCE.equals(type)) {
+      reports = client.getReduceTaskReports(id);
+    }
+    
+    assertNotNull("No reports found for task type '" + type.name() 
+                  + "' in job " + id, reports);
+    // make sure that the total number of reports match the expected
+    assertEquals("Mismatch in task id", numReports, reports.length);
+    
+    Counters counters = reports[taskId].getCounters();
+    
+    return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES);
+  }
+  
+  // set up heap options, target value for memory loader and the output 
+  // directory before running the job
+  @SuppressWarnings("deprecation")
+  private static RunningJob runHeapUsageTestJob(JobConf conf, Path testRootDir,
+                              String heapOptions, long targetMapValue,
+                              long targetReduceValue, FileSystem fs, 
+                              JobClient client, Path inDir) 
+  throws IOException {
+    // define a job
+    JobConf jobConf = new JobConf(conf);
+    
+    // configure the jobs
+    jobConf.setNumMapTasks(1);
+    jobConf.setNumReduceTasks(1);
+    jobConf.setMapperClass(MemoryLoaderMapper.class);
+    jobConf.setReducerClass(MemoryLoaderReducer.class);
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setOutputKeyClass(LongWritable.class);
+    jobConf.setOutputValueClass(Text.class);
+    jobConf.setMaxMapAttempts(1);
+    jobConf.setMaxReduceAttempts(1);
+    jobConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, heapOptions);
+    
+    // set the targets
+    jobConf.setLong(MemoryLoaderMapper.TARGET_VALUE, targetMapValue);
+    jobConf.setLong(MemoryLoaderReducer.TARGET_VALUE, targetReduceValue);
+    
+    // set the input directory for the job
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    
+    // define job output folder
+    Path outDir = new Path(testRootDir, "out");
+    fs.delete(outDir, true);
+    FileOutputFormat.setOutputPath(jobConf, outDir);
+    
+    // run the job
+    RunningJob job = client.submitJob(jobConf);
+    job.waitForCompletion();
+    JobID jobID = job.getID();
+    assertTrue("Job " + jobID + " failed!", job.isSuccessful());
+    
+    return job;
+  }
+  
+  /**
+   * Tests {@link TaskCounter}'s {@link TaskCounter.COMMITTED_HEAP_BYTES}. 
+   * The test consists of running a low-memory job which consumes less heap 
+   * memory and then running a high-memory job which consumes more heap memory, 
+   * and then ensuring that COMMITTED_HEAP_BYTES of low-memory job is smaller 
+   * than that of the high-memory job.
+   * @throws IOException
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testHeapUsageCounter() throws Exception {
+    JobConf conf = new JobConf();
+    // create a local filesystem handle
+    FileSystem fileSystem = FileSystem.getLocal(conf);
+    
+    // define test root directories
+    Path rootDir =
+      new Path(System.getProperty("test.build.data", "/tmp"));
+    Path testRootDir = new Path(rootDir, "testHeapUsageCounter");
+    // cleanup the test root directory
+    fileSystem.delete(testRootDir, true);
+    // set the current working directory
+    fileSystem.setWorkingDirectory(testRootDir);
+    
+    fileSystem.deleteOnExit(testRootDir);
+    
+    // create a mini cluster using the local file system
+    MiniMRCluster mrCluster = 
+      new MiniMRCluster(1, fileSystem.getUri().toString(), 1);
+    
+    try {
+      conf = mrCluster.createJobConf();
+      JobClient jobClient = new JobClient(conf);
+
+      // define job input
+      Path inDir = new Path(testRootDir, "in");
+      // create input data
+      createWordsFile(inDir, conf);
+
+      // configure and run a low memory job which will run without loading the
+      // jvm's heap
+      RunningJob lowMemJob = 
+        runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G", 
+                            0, 0, fileSystem, jobClient, inDir);
+      JobID lowMemJobID = lowMemJob.getID();
+      long lowMemJobMapHeapUsage = getTaskCounterUsage(jobClient, lowMemJobID, 
+                                                       1, 0, TaskType.MAP);
+      System.out.println("Job1 (low memory job) map task heap usage: " 
+                         + lowMemJobMapHeapUsage);
+      long lowMemJobReduceHeapUsage =
+        getTaskCounterUsage(jobClient, lowMemJobID, 1, 0, TaskType.REDUCE);
+      System.out.println("Job1 (low memory job) reduce task heap usage: " 
+                         + lowMemJobReduceHeapUsage);
+
+      // configure and run a high memory job which will load the jvm's heap
+      RunningJob highMemJob = 
+        runHeapUsageTestJob(conf, testRootDir, "-Xms32m -Xmx1G", 
+                            lowMemJobMapHeapUsage + 256*1024*1024, 
+                            lowMemJobReduceHeapUsage + 256*1024*1024,
+                            fileSystem, jobClient, inDir);
+      JobID highMemJobID = highMemJob.getID();
+
+      long highMemJobMapHeapUsage = getTaskCounterUsage(jobClient, highMemJobID,
+                                                        1, 0, TaskType.MAP);
+      System.out.println("Job2 (high memory job) map task heap usage: " 
+                         + highMemJobMapHeapUsage);
+      long highMemJobReduceHeapUsage =
+        getTaskCounterUsage(jobClient, highMemJobID, 1, 0, TaskType.REDUCE);
+      System.out.println("Job2 (high memory job) reduce task heap usage: " 
+                         + highMemJobReduceHeapUsage);
+
+      assertTrue("Incorrect map heap usage reported by the map task", 
+                 lowMemJobMapHeapUsage < highMemJobMapHeapUsage);
+
+      assertTrue("Incorrect reduce heap usage reported by the reduce task", 
+                 lowMemJobReduceHeapUsage < highMemJobReduceHeapUsage);
+    } finally {
+      // shutdown the mr cluster
+      mrCluster.shutdown();
+      try {
+        fileSystem.delete(testRootDir, true);
+      } catch (IOException ioe) {} 
+    }
+  }
+
   public static class NewMapTokenizer
       extends org.apache.hadoop.mapreduce.Mapper<Object,Text,Text,IntWritable> {
     private final static IntWritable one = new IntWritable(1);



Mime
View raw message