hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r937737 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/ src/test/mapred/org/apache/hadoop/mapreduce/
Date Sun, 25 Apr 2010 02:39:36 GMT
Author: cdouglas
Date: Sun Apr 25 02:39:35 2010
New Revision: 937737

URL: http://svn.apache.org/viewvc?rev=937737&view=rev
Log:
MAPREDUCE-1304. Add a task counter tracking time spent in GC. Contributed by Aaron Kimball

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Apr 25 02:39:35 2010
@@ -264,6 +264,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1065. Update mapred tutorial to use the new API. (Aaron Kimball
     via cdouglas)
 
+    MAPREDUCE-1304. Add a task counter tracking time spent in GC. (Aaron
+    Kimball via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Sun Apr 25 02:39:35
2010
@@ -21,13 +21,17 @@ package org.apache.hadoop.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
 import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+
 import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
@@ -46,6 +50,7 @@ import org.apache.hadoop.io.WritableUtil
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.JobStatus;
@@ -140,6 +145,7 @@ abstract public class Task implements Wr
   private int numSlotsRequired;
   protected TaskUmbilicalProtocol umbilical;
   protected SecretKey tokenSecret;
+  protected GcTimeUpdater gcUpdater;
 
   ////////////////////////////////////////////
   // Constructors
@@ -154,6 +160,7 @@ abstract public class Task implements Wr
       counters.findCounter(TaskCounter.FAILED_SHUFFLE);
     mergedMapOutputsCounter = 
       counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+    gcUpdater = new GcTimeUpdater();
   }
 
   public Task(String jobFile, TaskAttemptID taskId, int partition, 
@@ -175,6 +182,7 @@ abstract public class Task implements Wr
     failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
     mergedMapOutputsCounter = 
       counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+    gcUpdater = new GcTimeUpdater();
   }
 
   ////////////////////////////////////////////
@@ -499,6 +507,7 @@ abstract public class Task implements Wr
     private InputSplit split = null;
     private Progress taskProgress;
     private Thread pingThread = null;
+
     /**
      * flag that indicates whether progress update needs to be sent to parent.
      * If true, it has been set. If false, it has been reset. 
@@ -511,6 +520,7 @@ abstract public class Task implements Wr
       this.umbilical = umbilical;
       this.taskProgress = taskProgress;
     }
+
     // getters and setters for flag
     void setProgressFlag() {
       progressFlag.set(true);
@@ -680,6 +690,48 @@ abstract public class Task implements Wr
   }
 
   /**
+   * An updater that tracks the amount of time this task has spent in GC.
+   */
+  class GcTimeUpdater {
+    private long lastGcMillis = 0;
+    private List<GarbageCollectorMXBean> gcBeans = null;
+
+    public GcTimeUpdater() {
+      this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+      getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
+    }
+
+    /**
+     * @return the number of milliseconds that the gc has used for CPU
+     * since the last time this method was called.
+     */
+    protected long getElapsedGc() {
+      long thisGcMillis = 0;
+      for (GarbageCollectorMXBean gcBean : gcBeans) {
+        thisGcMillis += gcBean.getCollectionTime();
+      }
+
+      long delta = thisGcMillis - lastGcMillis;
+      this.lastGcMillis = thisGcMillis;
+      return delta;
+    }
+
+    /**
+     * Increment the gc-elapsed-time counter.
+     */
+    public void incrementGcCounter() {
+      if (null == counters) {
+        return; // nothing to do.
+      }
+
+      Counter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+      if (null != gcCounter) {
+        gcCounter.increment(getElapsedGc());
+      }
+    }
+  }
+
+  /**
    * An updater that tracks the last number reported for a given file
    * system and only creates the counters when they are needed.
    */
@@ -734,6 +786,8 @@ abstract public class Task implements Wr
       }
       updater.updateCounters();      
     }
+
+    gcUpdater.incrementGcCounter();
   }
 
   public void done(TaskUmbilicalProtocol umbilical,
@@ -767,6 +821,9 @@ abstract public class Task implements Wr
     }
     taskDone.set(true);
     reporter.stopCommunicationThread();
+    // Make sure we send at least one set of counter increments. It's
+    // ok to call updateCounters() in this thread after comm thread stopped.
+    updateCounters();
     sendLastUpdate(umbilical);
     //signal the tasktracker that we are done
     sendDone(umbilical);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Sun Apr 25
02:39:35 2010
@@ -36,5 +36,6 @@ public enum TaskCounter {
   SPILLED_RECORDS,
   SHUFFLED_MAPS, 
   FAILED_SHUFFLE,
-  MERGED_MAP_OUTPUTS
+  MERGED_MAP_OUTPUTS,
+  GC_TIME_MILLIS
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Sun
Apr 25 02:39:35 2010
@@ -30,3 +30,4 @@ SPILLED_RECORDS.name=          Spilled R
 SHUFFLED_MAPS.name=            Shuffled Maps 
 FAILED_SHUFFLE.name=           Failed Shuffles
 MERGED_MAP_OUTPUTS.name=       Merged Map outputs
+GC_TIME_MILLIS.name=           GC time elapsed (ms)

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
Sun Apr 25 02:39:35 2010
@@ -142,6 +142,31 @@ public class TestLocalRunner extends Tes
     }
   }
 
+  private static class GCMapper
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+    public void map(LongWritable key, Text val, Context c)
+        throws IOException, InterruptedException {
+
+      // Create a whole bunch of objects.
+      List<Integer> lst = new ArrayList<Integer>();
+      for (int i = 0; i < 20000; i++) {
+        lst.add(new Integer(i));
+      }
+
+      // Actually use this list, to ensure that it isn't just optimized away.
+      int sum = 0;
+      for (int x : lst) {
+        sum += x;
+      }
+
+      // throw away the list and run a GC.
+      lst = null;
+      System.gc();
+
+      c.write(new LongWritable(sum), val);
+    }
+  }
+
   /**
    * Create a single input file in the input directory.
    * @param dirPath the directory in which the file resides
@@ -245,6 +270,51 @@ public class TestLocalRunner extends Tes
   }
 
   /**
+   * Test that the GC counter actually increments when we know that we've
+   * spent some time in the GC during the mapper.
+   */
+  @Test
+  public void testGcCounter() throws Exception {
+    Path inputPath = getInputPath();
+    Path outputPath = getOutputPath();
+
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.getLocal(conf);
+
+    // Clear input/output dirs.
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    if (fs.exists(inputPath)) {
+      fs.delete(inputPath, true);
+    }
+
+    // Create one input file
+    createInputFile(inputPath, 0, 20);
+
+    // Now configure and run the job.
+    Job job = new Job();
+    job.setMapperClass(GCMapper.class);
+    job.setNumReduceTasks(0);
+    job.getConfiguration().set("io.sort.mb", "25");
+    FileInputFormat.addInputPath(job, inputPath);
+    FileOutputFormat.setOutputPath(job, outputPath);
+
+    boolean ret = job.waitForCompletion(true);
+    assertTrue("job failed", ret);
+
+    // This job should have done *some* gc work.
+    // It had to clean up 400,000 objects.
+    // We strongly suspect this will result in a few milliseconds effort.
+    Counter gcCounter = job.getCounters().findCounter(
+        TaskCounter.GC_TIME_MILLIS);
+    assertNotNull(gcCounter);
+    assertTrue("No time spent in gc", gcCounter.getValue() > 0);
+  }
+
+
+  /**
    * Run a test with several mappers in parallel, operating at different
    * speeds. Verify that the correct amount of output is created.
    */



Mime
View raw message