Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8515E174AA for ; Tue, 3 Feb 2015 18:42:04 +0000 (UTC) Received: (qmail 42159 invoked by uid 500); 3 Feb 2015 18:42:05 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 42088 invoked by uid 500); 3 Feb 2015 18:42:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 42079 invoked by uid 99); 3 Feb 2015 18:42:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Feb 2015 18:42:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1133EE02D8; Tue, 3 Feb 2015 18:42:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rkanter@apache.org To: common-commits@hadoop.apache.org Message-Id: <1b46bb4ed3fe4c819bc5ed652d5f915c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-3022. Expose Container resource information from NodeManager for monitoring (adhoot via ranter) Date: Tue, 3 Feb 2015 18:42:05 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 80705e034 -> f7a77819a YARN-3022. Expose Container resource information from NodeManager for monitoring (adhoot via ranter) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f7a77819 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f7a77819 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f7a77819 Branch: refs/heads/trunk Commit: f7a77819a1e4ff394e110941c1f8dd80f47dd38f Parents: 80705e0 Author: Robert Kanter Authored: Tue Feb 3 10:39:41 2015 -0800 Committer: Robert Kanter Committed: Tue Feb 3 10:39:41 2015 -0800 ---------------------------------------------------------------------- .../hadoop/metrics2/impl/MetricsRecords.java | 92 ++++++++++++++++++++ hadoop-yarn-project/CHANGES.txt | 3 + .../container/ContainerImpl.java | 3 +- .../monitor/ContainerMetrics.java | 42 +++++++-- .../monitor/ContainerStartMonitoringEvent.java | 7 +- .../monitor/ContainersMonitorImpl.java | 30 ++++++- .../monitor/TestContainerMetrics.java | 40 ++++++++- 7 files changed, 204 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7a77819/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java new file mode 100644 index 0000000..3c0999e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/MetricsRecords.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.impl; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsTag; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Utility class mainly for tests + */ +public class MetricsRecords { + + public static void assertTag(MetricsRecord record, String tagName, + String expectedValue) { + MetricsTag processIdTag = getFirstTagByName(record, + tagName); + assertNotNull(processIdTag); + assertEquals(expectedValue, processIdTag.value()); + } + + public static void assertMetric(MetricsRecord record, + String metricName, + Number expectedValue) { + AbstractMetric resourceLimitMetric = getFirstMetricByName( + record, metricName); + assertNotNull(resourceLimitMetric); + assertEquals(expectedValue, resourceLimitMetric.value()); + } + + private static MetricsTag getFirstTagByName(MetricsRecord record, String name) { + return Iterables.getFirst(Iterables.filter(record.tags(), + new MetricsTagPredicate(name)), null); + } + + private static AbstractMetric getFirstMetricByName( + MetricsRecord record, String name) { + return Iterables.getFirst( + Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)), + null); + } + + private static class MetricsTagPredicate implements Predicate { + private String tagName; + + public MetricsTagPredicate(String tagName) { + + this.tagName = tagName; + } + + @Override + public boolean apply(MetricsTag input) { + return input.name().equals(tagName); + } + } + + private static class AbstractMetricPredicate + implements Predicate { + private String metricName; + + public AbstractMetricPredicate( + String metricName) { + this.metricName = metricName; + } + + @Override + public boolean apply(AbstractMetric input) { + return input.name().equals(metricName); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7a77819/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3ece974..de3572e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -228,6 +228,9 @@ Release 2.7.0 - UNRELEASED YARN-3085. Application summary should include the application type (Rohith via jlowe) + YARN-3022. Expose Container resource information from NodeManager for + monitoring (adhoot via ranter) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7a77819/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 3f6616a..57b0bda 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -494,10 +494,11 @@ public class ContainerImpl implements Container { YarnConfiguration.NM_VMEM_PMEM_RATIO, YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO); long vmemBytes = (long) (pmemRatio * pmemBytes); + int cpuVcores = getResource().getVirtualCores(); dispatcher.getEventHandler().handle( new ContainerStartMonitoringEvent(containerId, - vmemBytes, pmemBytes)); + vmemBytes, pmemBytes, cpuVcores)); } private void addDiagnostics(String... diags) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7a77819/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java index 12201dd..7850688 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerMetrics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableStat; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -41,11 +42,28 @@ import static org.apache.hadoop.metrics2.lib.Interns.info; @Metrics(context="container") public class ContainerMetrics implements MetricsSource { + public static final String PMEM_LIMIT_METRIC_NAME = "pMemLimit"; + public static final String VMEM_LIMIT_METRIC_NAME = "vMemLimit"; + public static final String VCORE_LIMIT_METRIC_NAME = "vCoreLimit"; + public static final String PMEM_USAGE_METRIC_NAME = "pMemUsage"; + @Metric public MutableStat pMemMBsStat; + @Metric + public MutableGaugeInt pMemLimitMbs; + + @Metric + public MutableGaugeInt vMemLimitMbs; + + @Metric + public MutableGaugeInt cpuVcores; + static final MetricsInfo RECORD_INFO = - info("ContainerUsage", "Resource usage by container"); + info("ContainerResource", "Resource limit and usage by container"); + + public static final MetricsInfo PROCESSID_INFO = + info("ContainerPid", "Container Process Id"); final MetricsInfo recordInfo; final MetricsRegistry registry; @@ -76,7 +94,13 @@ public class ContainerMetrics implements MetricsSource { scheduleTimerTaskIfRequired(); this.pMemMBsStat = registry.newStat( - "pMem", "Physical memory stats", "Usage", "MBs", true); + PMEM_USAGE_METRIC_NAME, "Physical memory stats", "Usage", "MBs", true); + this.pMemLimitMbs = registry.newGauge( + PMEM_LIMIT_METRIC_NAME, "Physical memory limit in MBs", 0); + this.vMemLimitMbs = registry.newGauge( + VMEM_LIMIT_METRIC_NAME, "Virtual memory limit in MBs", 0); + this.cpuVcores = registry.newGauge( + VCORE_LIMIT_METRIC_NAME, "CPU limit in number of vcores", 0); } ContainerMetrics tag(MetricsInfo info, ContainerId containerId) { @@ -88,10 +112,6 @@ public class ContainerMetrics implements MetricsSource { return RECORD_INFO.name() + "_" + containerId.toString(); } - public static ContainerMetrics forContainer(ContainerId containerId) { - return forContainer(containerId, -1L); - } - public static ContainerMetrics forContainer( ContainerId containerId, long flushPeriodMs) { return forContainer( @@ -150,6 +170,16 @@ public class ContainerMetrics implements MetricsSource { this.pMemMBsStat.add(memoryMBs); } + public void recordProcessId(String processId) { + registry.tag(PROCESSID_INFO, processId); + } + + public void recordResourceLimit(int vmemLimit, int pmemLimit, int cpuVcores) { + this.vMemLimitMbs.set(vmemLimit); + this.pMemLimitMbs.set(pmemLimit); + this.cpuVcores.set(cpuVcores); + } + private synchronized void scheduleTimerTaskIfRequired() { if (flushPeriodMs > 0) { // Lazily initialize timer http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7a77819/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java index 407ada5..56e2d8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainerStartMonitoringEvent.java @@ -24,12 +24,14 @@ public class ContainerStartMonitoringEvent extends ContainersMonitorEvent { private final long vmemLimit; private final long pmemLimit; + private final int cpuVcores; public ContainerStartMonitoringEvent(ContainerId containerId, - long vmemLimit, long pmemLimit) { + long vmemLimit, long pmemLimit, int cpuVcores) { super(containerId, ContainersMonitorEventType.START_MONITORING_CONTAINER); this.vmemLimit = vmemLimit; this.pmemLimit = pmemLimit; + this.cpuVcores = cpuVcores; } public long getVmemLimit() { @@ -40,4 +42,7 @@ public class ContainerStartMonitoringEvent extends ContainersMonitorEvent { return this.pmemLimit; } + public int getCpuVcores() { + return this.cpuVcores; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7a77819/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 0ae4325..2cecda6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -219,14 +219,17 @@ public class ContainersMonitorImpl extends AbstractService implements private ResourceCalculatorProcessTree pTree; private long vmemLimit; private long pmemLimit; + private int cpuVcores; public ProcessTreeInfo(ContainerId containerId, String pid, - ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit) { + ResourceCalculatorProcessTree pTree, long vmemLimit, long pmemLimit, + int cpuVcores) { this.containerId = containerId; this.pid = pid; this.pTree = pTree; this.vmemLimit = vmemLimit; this.pmemLimit = pmemLimit; + this.cpuVcores = cpuVcores; } public ContainerId getContainerId() { @@ -259,6 +262,14 @@ public class ContainersMonitorImpl extends AbstractService implements public long getPmemLimit() { return this.pmemLimit; } + + /** + * Return the number of cpu vcores assigned + * @return + */ + public int getCpuVcores() { + return this.cpuVcores; + } } @@ -362,7 +373,8 @@ public class ContainersMonitorImpl extends AbstractService implements synchronized (containersToBeRemoved) { for (ContainerId containerId : containersToBeRemoved) { if (containerMetricsEnabled) { - ContainerMetrics.forContainer(containerId).finished(); + ContainerMetrics.forContainer( + containerId, containerMetricsPeriodMs).finished(); } trackingContainers.remove(containerId); LOG.info("Stopping resource-monitoring for " + containerId); @@ -397,6 +409,17 @@ public class ContainersMonitorImpl extends AbstractService implements ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf); ptInfo.setPid(pId); ptInfo.setProcessTree(pt); + + if (containerMetricsEnabled) { + ContainerMetrics usageMetrics = ContainerMetrics + .forContainer(containerId, containerMetricsPeriodMs); + int cpuVcores = ptInfo.getCpuVcores(); + final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20); + final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20); + usageMetrics.recordResourceLimit( + vmemLimit, pmemLimit, cpuVcores); + usageMetrics.recordProcessId(pId); + } } } // End of initializing any uninitialized processTrees @@ -576,7 +599,8 @@ public class ContainersMonitorImpl extends AbstractService implements synchronized (this.containersToBeAdded) { ProcessTreeInfo processTreeInfo = new ProcessTreeInfo(containerId, null, null, - startEvent.getVmemLimit(), startEvent.getPmemLimit()); + startEvent.getVmemLimit(), startEvent.getPmemLimit(), + startEvent.getCpuVcores()); this.containersToBeAdded.put(containerId, processTreeInfo); } break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/f7a77819/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java index 158e2a8..c628648 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainerMetrics.java @@ -18,19 +18,20 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.metrics2.impl.MetricsRecords; import org.apache.hadoop.yarn.api.records.ContainerId; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import java.util.Timer; - public class TestContainerMetrics { @Test @@ -71,4 +72,39 @@ public class TestContainerMetrics { metrics.getMetrics(collector, true); assertEquals(ERR, 0, collector.getRecords().size()); } + + @Test + public void testContainerMetricsLimit() throws InterruptedException { + final String ERR = "Error in number of records"; + + MetricsSystem system = mock(MetricsSystem.class); + doReturn(this).when(system).register(anyString(), anyString(), any()); + + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + ContainerId containerId = mock(ContainerId.class); + ContainerMetrics metrics = ContainerMetrics.forContainer(containerId, 100); + + int anyPmemLimit = 1024; + int anyVmemLimit = 2048; + int anyVcores = 10; + String anyProcessId = "1234"; + + metrics.recordResourceLimit(anyVmemLimit, anyPmemLimit, anyVcores); + metrics.recordProcessId(anyProcessId); + + Thread.sleep(110); + metrics.getMetrics(collector, true); + assertEquals(ERR, 1, collector.getRecords().size()); + MetricsRecord record = collector.getRecords().get(0); + + MetricsRecords.assertTag(record, ContainerMetrics.PROCESSID_INFO.name(), + anyProcessId); + + MetricsRecords.assertMetric(record, ContainerMetrics + .PMEM_LIMIT_METRIC_NAME, anyPmemLimit); + MetricsRecords.assertMetric(record, ContainerMetrics.VMEM_LIMIT_METRIC_NAME, anyVmemLimit); + MetricsRecords.assertMetric(record, ContainerMetrics.VCORE_LIMIT_METRIC_NAME, anyVcores); + + collector.clear(); + } }