Author: cdouglas
Date: Sun Apr 25 02:39:35 2010
New Revision: 937737
URL: http://svn.apache.org/viewvc?rev=937737&view=rev
Log:
MAPREDUCE-1304. Add a task counter tracking time spent in GC. Contributed by Aaron Kimball
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Apr 25 02:39:35 2010
@@ -264,6 +264,9 @@ Trunk (unreleased changes)
MAPREDUCE-1065. Update mapred tutorial to use the new API. (Aaron Kimball
via cdouglas)
+ MAPREDUCE-1304. Add a task counter tracking time spent in GC. (Aaron
+ Kimball via cdouglas)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Sun Apr 25 02:39:35
2010
@@ -21,13 +21,17 @@ package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
@@ -46,6 +50,7 @@ import org.apache.hadoop.io.WritableUtil
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.JobStatus;
@@ -140,6 +145,7 @@ abstract public class Task implements Wr
private int numSlotsRequired;
protected TaskUmbilicalProtocol umbilical;
protected SecretKey tokenSecret;
+ protected GcTimeUpdater gcUpdater;
////////////////////////////////////////////
// Constructors
@@ -154,6 +160,7 @@ abstract public class Task implements Wr
counters.findCounter(TaskCounter.FAILED_SHUFFLE);
mergedMapOutputsCounter =
counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+ gcUpdater = new GcTimeUpdater();
}
public Task(String jobFile, TaskAttemptID taskId, int partition,
@@ -175,6 +182,7 @@ abstract public class Task implements Wr
failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE);
mergedMapOutputsCounter =
counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+ gcUpdater = new GcTimeUpdater();
}
////////////////////////////////////////////
@@ -499,6 +507,7 @@ abstract public class Task implements Wr
private InputSplit split = null;
private Progress taskProgress;
private Thread pingThread = null;
+
/**
* flag that indicates whether progress update needs to be sent to parent.
* If true, it has been set. If false, it has been reset.
@@ -511,6 +520,7 @@ abstract public class Task implements Wr
this.umbilical = umbilical;
this.taskProgress = taskProgress;
}
+
// getters and setters for flag
void setProgressFlag() {
progressFlag.set(true);
@@ -680,6 +690,48 @@ abstract public class Task implements Wr
}
/**
+ * An updater that tracks the amount of time this task has spent in GC.
+ */
+ class GcTimeUpdater {
+ private long lastGcMillis = 0;
+ private List<GarbageCollectorMXBean> gcBeans = null;
+
+ public GcTimeUpdater() {
+ this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
+ getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
+ }
+
+ /**
+ * @return the number of milliseconds that the gc has used for CPU
+ * since the last time this method was called.
+ */
+ protected long getElapsedGc() {
+ long thisGcMillis = 0;
+ for (GarbageCollectorMXBean gcBean : gcBeans) {
+ thisGcMillis += gcBean.getCollectionTime();
+ }
+
+ long delta = thisGcMillis - lastGcMillis;
+ this.lastGcMillis = thisGcMillis;
+ return delta;
+ }
+
+ /**
+ * Increment the gc-elapsed-time counter.
+ */
+ public void incrementGcCounter() {
+ if (null == counters) {
+ return; // nothing to do.
+ }
+
+ Counter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+ if (null != gcCounter) {
+ gcCounter.increment(getElapsedGc());
+ }
+ }
+ }
+
+ /**
* An updater that tracks the last number reported for a given file
* system and only creates the counters when they are needed.
*/
@@ -734,6 +786,8 @@ abstract public class Task implements Wr
}
updater.updateCounters();
}
+
+ gcUpdater.incrementGcCounter();
}
public void done(TaskUmbilicalProtocol umbilical,
@@ -767,6 +821,9 @@ abstract public class Task implements Wr
}
taskDone.set(true);
reporter.stopCommunicationThread();
+ // Make sure we send at least one set of counter increments. It's
+ // ok to call updateCounters() in this thread after comm thread stopped.
+ updateCounters();
sendLastUpdate(umbilical);
//signal the tasktracker that we are done
sendDone(umbilical);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Sun Apr 25
02:39:35 2010
@@ -36,5 +36,6 @@ public enum TaskCounter {
SPILLED_RECORDS,
SHUFFLED_MAPS,
FAILED_SHUFFLE,
- MERGED_MAP_OUTPUTS
+ MERGED_MAP_OUTPUTS,
+ GC_TIME_MILLIS
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.properties Sun
Apr 25 02:39:35 2010
@@ -30,3 +30,4 @@ SPILLED_RECORDS.name= Spilled R
SHUFFLED_MAPS.name= Shuffled Maps
FAILED_SHUFFLE.name= Failed Shuffles
MERGED_MAP_OUTPUTS.name= Merged Map outputs
+GC_TIME_MILLIS.name= GC time elapsed (ms)
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java?rev=937737&r1=937736&r2=937737&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestLocalRunner.java
Sun Apr 25 02:39:35 2010
@@ -142,6 +142,31 @@ public class TestLocalRunner extends Tes
}
}
+ private static class GCMapper
+ extends Mapper<LongWritable, Text, LongWritable, Text> {
+ public void map(LongWritable key, Text val, Context c)
+ throws IOException, InterruptedException {
+
+ // Create a whole bunch of objects.
+ List<Integer> lst = new ArrayList<Integer>();
+ for (int i = 0; i < 20000; i++) {
+ lst.add(new Integer(i));
+ }
+
+ // Actually use this list, to ensure that it isn't just optimized away.
+ int sum = 0;
+ for (int x : lst) {
+ sum += x;
+ }
+
+ // throw away the list and run a GC.
+ lst = null;
+ System.gc();
+
+ c.write(new LongWritable(sum), val);
+ }
+ }
+
/**
* Create a single input file in the input directory.
* @param dirPath the directory in which the file resides
@@ -245,6 +270,51 @@ public class TestLocalRunner extends Tes
}
/**
+ * Test that the GC counter actually increments when we know that we've
+ * spent some time in the GC during the mapper.
+ */
+ @Test
+ public void testGcCounter() throws Exception {
+ Path inputPath = getInputPath();
+ Path outputPath = getOutputPath();
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+
+ // Clear input/output dirs.
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ if (fs.exists(inputPath)) {
+ fs.delete(inputPath, true);
+ }
+
+ // Create one input file
+ createInputFile(inputPath, 0, 20);
+
+ // Now configure and run the job.
+ Job job = new Job();
+ job.setMapperClass(GCMapper.class);
+ job.setNumReduceTasks(0);
+ job.getConfiguration().set("io.sort.mb", "25");
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ boolean ret = job.waitForCompletion(true);
+ assertTrue("job failed", ret);
+
+ // This job should have done *some* gc work.
+ // It had to clean up 400,000 objects.
+ // We strongly suspect this will result in a few milliseconds effort.
+ Counter gcCounter = job.getCounters().findCounter(
+ TaskCounter.GC_TIME_MILLIS);
+ assertNotNull(gcCounter);
+ assertTrue("No time spent in gc", gcCounter.getValue() > 0);
+ }
+
+
+ /**
* Run a test with several mappers in parallel, operating at different
* speeds. Verify that the correct amount of output is created.
*/
|