Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8C9ED200C1A for ; Mon, 30 Jan 2017 07:04:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8B5B8160B64; Mon, 30 Jan 2017 06:04:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6D860160B58 for ; Mon, 30 Jan 2017 07:04:00 +0100 (CET) Received: (qmail 34964 invoked by uid 500); 30 Jan 2017 06:03:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 33330 invoked by uid 99); 30 Jan 2017 06:03:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Jan 2017 06:03:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BC122DFF26; Mon, 30 Jan 2017 06:03:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vvasudev@apache.org To: common-commits@hadoop.apache.org Date: Mon, 30 Jan 2017 06:04:23 -0000 Message-Id: <7fb02beb1c204bc0ba361b0c7a951735@git.apache.org> In-Reply-To: <9ef251a20c0f40bd9f677d9f1d48c278@git.apache.org> References: <9ef251a20c0f40bd9f677d9f1d48c278@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/52] [abbrv] hadoop git commit: MAPREDUCE-6829. Add peak memory usage counter for each task. (Miklos Szegedi via kasha) archived-at: Mon, 30 Jan 2017 06:04:01 -0000 MAPREDUCE-6829. Add peak memory usage counter for each task. (Miklos Szegedi via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c65f884f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c65f884f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c65f884f Branch: refs/heads/YARN-3926 Commit: c65f884fc7e08118524f8c88737119d8196b4c1b Parents: 44606aa Author: Karthik Kambatla Authored: Thu Jan 26 11:08:13 2017 -0800 Committer: Karthik Kambatla Committed: Thu Jan 26 11:08:13 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/mapred/Task.java | 24 ++- .../apache/hadoop/mapreduce/TaskCounter.java | 8 +- .../counters/FrameworkCounterGroup.java | 6 +- .../hadoop/mapreduce/TaskCounter.properties | 4 + .../org/apache/hadoop/mapred/TestCounters.java | 31 ++- .../apache/hadoop/mapred/TestJobCounters.java | 188 +++++++++++++++++++ 6 files changed, 256 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 119d6a7..c1ae0ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -110,7 +110,11 @@ abstract public class Task implements Writable, Configurable { CPU_MILLISECONDS, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES, - COMMITTED_HEAP_BYTES + COMMITTED_HEAP_BYTES, + MAP_PHYSICAL_MEMORY_BYTES_MAX, + MAP_VIRTUAL_MEMORY_BYTES_MAX, + REDUCE_PHYSICAL_MEMORY_BYTES_MAX, + REDUCE_VIRTUAL_MEMORY_BYTES_MAX } /** @@ -964,6 +968,24 @@ abstract public class Task implements Writable, Configurable { if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) { counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem); } + + if (pMem != ResourceCalculatorProcessTree.UNAVAILABLE) { + TaskCounter counter = isMapTask() ? + TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX : + TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX; + Counters.Counter pMemCounter = + counters.findCounter(counter); + pMemCounter.setValue(Math.max(pMemCounter.getValue(), pMem)); + } + + if (vMem != ResourceCalculatorProcessTree.UNAVAILABLE) { + TaskCounter counter = isMapTask() ? + TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX : + TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX; + Counters.Counter vMemCounter = + counters.findCounter(counter); + vMemCounter.setValue(Math.max(vMemCounter.getValue(), vMem)); + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java index 42ef067..0fab96c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCounter.java @@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Public @InterfaceStability.Evolving public enum TaskCounter { - MAP_INPUT_RECORDS, + MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_SKIPPED_RECORDS, MAP_OUTPUT_BYTES, @@ -47,5 +47,9 @@ public enum TaskCounter { CPU_MILLISECONDS, PHYSICAL_MEMORY_BYTES, VIRTUAL_MEMORY_BYTES, - COMMITTED_HEAP_BYTES + COMMITTED_HEAP_BYTES, + MAP_PHYSICAL_MEMORY_BYTES_MAX, + MAP_VIRTUAL_MEMORY_BYTES_MAX, + REDUCE_PHYSICAL_MEMORY_BYTES_MAX, + REDUCE_VIRTUAL_MEMORY_BYTES_MAX; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java index e78fe2e..b51f528 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java @@ -100,7 +100,11 @@ public abstract class FrameworkCounterGroup, @Override public void increment(long incr) { - value += incr; + if (key.name().endsWith("_MAX")) { + value = value > incr ? value : incr; + } else { + value += incr; + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties index d54b980..0fd1028 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/TaskCounter.properties @@ -37,3 +37,7 @@ COMMITTED_HEAP_BYTES.name= Total committed heap usage (bytes) CPU_MILLISECONDS.name= CPU time spent (ms) PHYSICAL_MEMORY_BYTES.name= Physical memory (bytes) snapshot VIRTUAL_MEMORY_BYTES.name= Virtual memory (bytes) snapshot +MAP_PHYSICAL_MEMORY_BYTES_MAX.name= Peak Map Physical memory (bytes) +MAP_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Map Virtual memory (bytes) +REDUCE_PHYSICAL_MEMORY_BYTES_MAX.name=Peak Reduce Physical memory (bytes) +REDUCE_VIRTUAL_MEMORY_BYTES_MAX.name= Peak Reduce Virtual memory (bytes) http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java index 5e2763e..010f0f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java @@ -348,7 +348,36 @@ public class TestCounters { counterGroup.findCounter("Unknown"); Assert.assertNull(count2); } - + + @SuppressWarnings("rawtypes") + @Test + public void testTaskCounter() { + GroupFactory groupFactory = new GroupFactoryForTest(); + FrameworkGroupFactory frameworkGroupFactory = + groupFactory.newFrameworkGroupFactory(TaskCounter.class); + Group group = (Group) frameworkGroupFactory.newGroup("TaskCounter"); + + FrameworkCounterGroup counterGroup = + (FrameworkCounterGroup) group.getUnderlyingGroup(); + + org.apache.hadoop.mapreduce.Counter count1 = + counterGroup.findCounter( + TaskCounter.PHYSICAL_MEMORY_BYTES.toString()); + Assert.assertNotNull(count1); + count1.increment(10); + count1.increment(10); + Assert.assertEquals(20, count1.getValue()); + + // Verify no exception get thrown when finding an unknown counter + org.apache.hadoop.mapreduce.Counter count2 = + counterGroup.findCounter( + TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX.toString()); + Assert.assertNotNull(count2); + count2.increment(5); + count2.increment(10); + Assert.assertEquals(10, count2.getValue()); + } + @Test public void testFilesystemCounter() { GroupFactory groupFactory = new GroupFactoryForTest(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c65f884f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java index 8b692ca..850c00a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobCounters.java @@ -40,10 +40,12 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; +import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -751,4 +753,190 @@ public class TestJobCounters { } } + /** + * Test mapper. + */ + public static class TokenizerMapper extends + org.apache.hadoop.mapreduce.Mapper { + + private final static IntWritable ONE = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context) + throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, ONE); + } + } + } + + /** + * Test reducer. + */ + public static class IntSumReducer extends + org.apache.hadoop.mapreduce.Reducer{ + /** + * Test customer counter. + */ + public enum Counters { MY_COUNTER_MAX } + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + context.getCounter(Counters.MY_COUNTER_MAX).increment(100); + } + } + + /** + * Mock resource reporting. + */ + public static class MockResourceCalculatorProcessTree + extends ResourceCalculatorProcessTree { + + public MockResourceCalculatorProcessTree(String root) { + super(root); + } + + @Override + public void updateProcessTree() { + } + + @Override + public String getProcessTreeDump() { + return ""; + } + + @Override + public long getCumulativeCpuTime() { + return 0; + } + + @Override + public boolean checkPidPgrpidForMatch() { + return true; + } + + @Override + public long getRssMemorySize() { + return 1024; + } + + @Override + public long getVirtualMemorySize() { + return 2000; + } + + @Override + public float getCpuUsagePercent() { + return 0; + } + } + + @Test + public void testMockResourceCalculatorProcessTree() { + ResourceCalculatorProcessTree tree; + tree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree( + "1", TestJobCounters.MockResourceCalculatorProcessTree.class, + new Configuration()); + assertNotNull(tree); + } + + /** + * End to end test of maximum counters. + * @throws IOException test failed + * @throws ClassNotFoundException test failed + * @throws InterruptedException test failed + */ + @Test + public void testMaxCounter() + throws IOException, ClassNotFoundException, InterruptedException { + // Create mapreduce cluster + MiniMRClientCluster mrCluster = MiniMRClientClusterFactory.create( + this.getClass(), 2, new Configuration()); + + try { + // Setup input and output paths + Path rootDir = + new Path(System.getProperty("test.build.data", "/tmp")); + Path testRootDir = new Path(rootDir, "testMaxCounter"); + Path testInputDir = new Path(testRootDir, "input"); + Path testOutputDir = new Path(testRootDir, "output"); + FileSystem fs = FileSystem.getLocal(new Configuration()); + fs.mkdirs(testInputDir); + Path testInputFile = new Path(testInputDir, "file01"); + FSDataOutputStream stream = + fs.create(testInputFile); + stream.writeChars("foo"); + stream.writeChars("bar"); + stream.close(); + fs.delete(testOutputDir, true); + + // Run job (1 mapper, 2 reducers) + Configuration conf = new Configuration(); + conf.setClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE, + MockResourceCalculatorProcessTree.class, + ResourceCalculatorProcessTree.class); + Job job = Job.getInstance(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setNumReduceTasks(2); // make sure we have double here to test max + org.apache.hadoop.mapreduce.lib.input.FileInputFormat + .addInputPath(job, testInputDir); + org.apache.hadoop.mapreduce.lib.output.FileOutputFormat + .setOutputPath(job, testOutputDir); + assertTrue(job.waitForCompletion(true)); + + // Verify physical numbers + org.apache.hadoop.mapreduce.Counter maxMap = + job.getCounters().findCounter( + TaskCounter.MAP_PHYSICAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter maxReduce = + job.getCounters().findCounter( + TaskCounter.REDUCE_PHYSICAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter allP = + job.getCounters().findCounter( + TaskCounter.PHYSICAL_MEMORY_BYTES); + assertEquals(1024, maxMap.getValue()); + assertEquals(1024, maxReduce.getValue()); + assertEquals(3072, allP.getValue()); + + // Verify virtual numbers + org.apache.hadoop.mapreduce.Counter maxMapV = + job.getCounters().findCounter( + TaskCounter.MAP_VIRTUAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter maxReduceV = + job.getCounters().findCounter( + TaskCounter.REDUCE_VIRTUAL_MEMORY_BYTES_MAX); + org.apache.hadoop.mapreduce.Counter allV = + job.getCounters().findCounter( + TaskCounter.VIRTUAL_MEMORY_BYTES); + assertEquals(2000, maxMapV.getValue()); + assertEquals(2000, maxReduceV.getValue()); + assertEquals(6000, allV.getValue()); + + // Make sure customer counters are not affected by the _MAX + // code in FrameworkCountersGroup + org.apache.hadoop.mapreduce.Counter customerCounter = + job.getCounters().findCounter( + IntSumReducer.Counters.MY_COUNTER_MAX); + assertEquals(200, customerCounter.getValue()); + + fs.delete(testInputDir, true); + fs.delete(testOutputDir, true); + } finally { + mrCluster.stop(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org