From mapreduce-commits-return-1560-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Sat Apr 30 20:35:18 2011 Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A73C83E5E for ; Sat, 30 Apr 2011 20:35:18 +0000 (UTC) Received: (qmail 23336 invoked by uid 500); 30 Apr 2011 20:35:18 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 23304 invoked by uid 500); 30 Apr 2011 20:35:18 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 23296 invoked by uid 99); 30 Apr 2011 20:35:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Apr 2011 20:35:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Apr 2011 20:35:12 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5D06023888FE; Sat, 30 Apr 2011 20:34:50 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1098152 - in /hadoop/mapreduce/branches/MR-279/yarn: yarn-common/src/main/java/org/apache/hadoop/yarn/util/ yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ yarn-server/yarn-serv... Date: Sat, 30 Apr 2011 20:34:50 -0000 To: mapreduce-commits@hadoop.apache.org From: mahadev@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110430203450.5D06023888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mahadev Date: Sat Apr 30 20:34:49 2011 New Revision: 1098152 URL: http://svn.apache.org/viewvc?rev=1098152&view=rev Log: MAPREDUCE-2434. Metrics for ResourceManager. Contributed by Luke Lu. - added files Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java?rev=1098152&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java Sat Apr 30 20:34:49 2011 @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util; + +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +/** + * Convenient API record utils + */ +public class Records { + // The default record factory + private static final RecordFactory factory = + RecordFactoryProvider.getRecordFactory(null); + + public static T newRecord(Class cls) { + return factory.newRecordInstance(cls); + } +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java?rev=1098152&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java Sat Apr 30 20:34:49 2011 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util; + +/** + * Some utilities for introspection + */ +public class Self { + private static boolean firstTime = true; + private static boolean isUnitTest = false; + private static boolean isJUnitTest = false; + + public synchronized static boolean isUnitTest() { + detect(); + return isUnitTest; + } + + public synchronized static boolean isJUnitTest() { + detect(); + return isJUnitTest; + } + + private synchronized static void detect() { + if (!firstTime) { + return; + } + firstTime = false; + for (StackTraceElement e : new Throwable().getStackTrace()) { + String className = e.getClassName(); + if (className.startsWith("org.junit")) { + isUnitTest = isJUnitTest = true; + return; + } + if (className.startsWith("org.apache.maven.surefire")) { + isUnitTest = true; + return; + } + } + } +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1098152&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Sat Apr 30 20:34:49 2011 @@ -0,0 +1,97 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.resource; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; + +@Private +@Evolving +public class Resources { + // Java doesn't have const :( + private static final Resource NONE = createResource(0); + + public static Resource createResource(int memory) { + Resource resource = Records.newRecord(Resource.class); + resource.setMemory(memory); + return resource; + } + + public static Resource none() { + assert NONE.getMemory() == 0 : "NONE should be empty"; + return NONE; + } + + public static Resource clone(Resource res) { + return createResource(res.getMemory()); + } + + public static Resource addTo(Resource lhs, Resource rhs) { + lhs.setMemory(lhs.getMemory() + rhs.getMemory()); + return lhs; + } + + public static Resource add(Resource lhs, Resource rhs) { + return addTo(clone(lhs), rhs); + } + + public static Resource subtractFrom(Resource lhs, Resource rhs) { + lhs.setMemory(lhs.getMemory() - rhs.getMemory()); + return lhs; + } + + public static Resource subtract(Resource lhs, Resource rhs) { + return subtractFrom(clone(lhs), rhs); + } + + public static Resource negate(Resource resource) { + return subtract(NONE, resource); + } + + public static Resource multiplyTo(Resource lhs, int by) { + lhs.setMemory(lhs.getMemory() * by); + return lhs; + } + + public static Resource multiply(Resource lhs, int by) { + return multiplyTo(clone(lhs), by); + } + + public static boolean equals(Resource lhs, Resource rhs) { + return lhs.getMemory() == rhs.getMemory(); + } + + public static boolean lessThan(Resource lhs, Resource rhs) { + return lhs.getMemory() < rhs.getMemory(); + } + + public static boolean lessThanOrEqual(Resource lhs, Resource rhs) { + return lhs.getMemory() <= rhs.getMemory(); + } + + public static boolean greaterThan(Resource lhs, Resource rhs) { + return lhs.getMemory() > rhs.getMemory(); + } + + public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) { + return lhs.getMemory() >= rhs.getMemory(); + } +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1098152&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java Sat Apr 30 20:34:49 2011 @@ -0,0 +1,256 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.base.Splitter; +import java.util.Map; +import java.util.HashMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import static org.apache.hadoop.metrics2.lib.Interns.info; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterInt; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.yarn.api.records.ApplicationState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Self; +import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class QueueMetrics { + @Metric("# of apps submitted") MutableCounterInt appsSubmitted; + @Metric("# of running apps") MutableGaugeInt appsRunning; + @Metric("# of pending apps") MutableGaugeInt appsPending; + @Metric("# of apps completed") MutableCounterInt appsCompleted; + @Metric("# of apps killed") MutableCounterInt appsKilled; + @Metric("# of apps failed") MutableCounterInt appsFailed; + + @Metric("Allocated memory in GiB") MutableGaugeInt allocatedGB; + @Metric("# of allocated containers") MutableGaugeInt allocatedContainers; + @Metric("Available memory in GiB") MutableGaugeInt availableGB; + @Metric("Pending memory allocation in GiB") MutableGaugeInt pendingGB; + @Metric("# of pending containers") MutableGaugeInt pendingContainers; + + static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class); + static final int GB = 1024; // resource.memory is in MB + static final MetricsInfo RECORD_INFO = info("SchedulerMetrics", + "Metrics for the resource scheduler"); + static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue"); + static final MetricsInfo USER_INFO = info("User", "Metrics by user"); + static final Splitter Q_SPLITTER = + Splitter.on('.').omitEmptyStrings().trimResults(); + + // MSXXX: until metrics system handle this automatically + private static QueueMetrics dummyMetrics = null; + private static boolean firstTime = true; + + final MetricsRegistry registry; + final String queueName; + final QueueMetrics parent; + final MetricsSystem metricsSystem; + private final Map users; + + QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics) { + registry = new MetricsRegistry(RECORD_INFO); + this.queueName = queueName; + this.parent = parent != null ? parent.getMetrics() : null; + this.users = enableUserMetrics ? new HashMap() + : null; + metricsSystem = ms; + } + + QueueMetrics tag(MetricsInfo info, String value) { + registry.tag(info, value); + return this; + } + + static StringBuilder sourceName(String queueName) { + StringBuilder sb = new StringBuilder(RECORD_INFO.name()); + int i = 0; + for (String node : Q_SPLITTER.split(queueName)) { + sb.append(",q").append(i++).append('=').append(node); + } + return sb; + } + + public synchronized + static QueueMetrics forQueue(String queueName, Queue parent, + boolean enableUserMetrics) { + MetricsSystem ms = null; + if (firstTime) { + firstTime = false; + ms = DefaultMetricsSystem.instance(); + } else if (!Self.isUnitTest()) { + ms = DefaultMetricsSystem.instance(); + } else { + return dummyMetrics; // metrics specific tests should use the long form + } + QueueMetrics metrics = forQueue(ms, queueName, parent, enableUserMetrics); + if (dummyMetrics == null) { + dummyMetrics = metrics; + } + return metrics; + } + + public static QueueMetrics forQueue(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics) { + QueueMetrics metrics = new QueueMetrics(ms, queueName, parent, + enableUserMetrics).tag(QUEUE_INFO, queueName); + return ms == null ? metrics : ms.register(sourceName(queueName).toString(), + "Metrics for queue: " + queueName, metrics); + } + + synchronized QueueMetrics getUserMetrics(String userName) { + if (users == null) { + return null; + } + QueueMetrics metrics = users.get(userName); + if (metrics == null) { + metrics = new QueueMetrics(metricsSystem, queueName, null, false); + users.put(userName, metrics); + metricsSystem.register( + sourceName(queueName).append(",user=").append(userName).toString(), + "Metrics for user '"+ userName +"' in queue '"+ queueName +"'", + metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName)); + } + return metrics; + } + + public void submitApp(String user) { + appsSubmitted.incr(); + appsPending.incr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.submitApp(user); + } + if (parent != null) { + parent.submitApp(user); + } + } + + public void incrAppsRunning(String user) { + appsRunning.incr(); + appsPending.decr(); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrAppsRunning(user); + } + if (parent != null) { + parent.incrAppsRunning(user); + } + } + + public void finishApp(Application app) { + ApplicationState state = app.getState(); + switch (state) { + case KILLED: appsKilled.incr(); break; + case FAILED: appsFailed.incr(); break; + default: appsCompleted.incr(); break; + } + if (app.isPending()) { + appsPending.decr(); + } else { + appsRunning.decr(); + } + QueueMetrics userMetrics = getUserMetrics(app.getUser()); + if (userMetrics != null) { + userMetrics.finishApp(app); + } + if (parent != null) { + parent.finishApp(app); + } + } + + /** + * Set available resources. To be called by scheduler periodically as + * resources become available. + * @param mb memory in MB + */ + public void setAvailableQueueMemory(int mb) { + availableGB.set(mb/GB); + } + + /** + * Set available resources. To be called by scheduler periodically as + * resources become available. + * @param user + * @param mb memory in MB + */ + public void setAvailableUserMemory(String user, int mb) { + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.setAvailableQueueMemory(mb); + } + } + + /** + * Increment pending resource metrics + * @param user + * @param containers + * @param res the TOTAL delta of resources note this is different from + * the other APIs which use per container resource + */ + public void incrPendingResources(String user, int containers, Resource res) { + _incrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrPendingResources(user, containers, res); + } + if (parent != null) { + parent.incrPendingResources(user, containers, res); + } + } + + private void _incrPendingResources(int containers, Resource res) { + pendingContainers.incr(containers); + pendingGB.incr(res.getMemory()/GB); + } + + public void decrPendingResources(String user, int containers, Resource res) { + _decrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.decrPendingResources(user, containers, res); + } + if (parent != null) { + parent.decrPendingResources(user, containers, res); + } + } + + private void _decrPendingResources(int containers, Resource res) { + pendingContainers.decr(containers); + pendingGB.decr(res.getMemory()/GB); + } + + public void allocateResources(String user, int containers, Resource res) { + allocatedContainers.incr(containers); + allocatedGB.incr(res.getMemory()/GB * containers); + _decrPendingResources(containers, multiply(res, containers)); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(user, containers, res); + } + if (parent != null) { + parent.allocateResources(user, containers, res); + } + } + + public void releaseResources(String user, int containers, Resource res) { + allocatedContainers.decr(containers); + allocatedGB.decr(res.getMemory()/GB * containers); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.releaseResources(user, containers, res); + } + if (parent != null) { + parent.releaseResources(user, containers, res); + } + } +} Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1098152&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java (added) +++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java Sat Apr 30 20:34:49 2011 @@ -0,0 +1,195 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import static org.apache.hadoop.test.MetricsAsserts.*; +import static org.apache.hadoop.test.MockitoMaker.*; +import org.apache.hadoop.yarn.api.records.ApplicationState; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; + +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class TestQueueMetrics { + static final int GB = 1024; // MB + + MetricsSystem ms; + + @Before public void setup() { + ms = new MetricsSystemImpl(); + } + + @Test public void testDefaultSingleQueueMetrics() { + String queueName = "single"; + String user = "alice"; + + QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false); + MetricsSource queueSource= queueSource(ms, queueName); + Application app = mockApp(user); + + metrics.submitApp(user); + MetricsSource userSource = userSource(ms, queueName, user); + checkApps(queueSource, 1, 1, 0, 0, 0, 0); + + metrics.setAvailableQueueMemory(100*GB); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, 0, 0, 100, 15, 5); + + metrics.incrAppsRunning(user); + checkApps(queueSource, 1, 0, 1, 0, 0, 0); + + metrics.allocateResources(user, 3, Resources.createResource(2*GB)); + checkResources(queueSource, 6, 3, 100, 9, 2); + + metrics.releaseResources(user, 1, Resources.createResource(2*GB)); + checkResources(queueSource, 4, 2, 100, 9, 2); + + metrics.finishApp(app); + checkApps(queueSource, 1, 0, 0, 1, 0, 0); + assertNull(userSource); + } + + @Test public void testSingleQueueWithUserMetrics() { + String queueName = "single2"; + String user = "dodo"; + + QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true); + MetricsSource queueSource = queueSource(ms, queueName); + Application app = mockApp(user); + + metrics.submitApp(user); + MetricsSource userSource = userSource(ms, queueName, user); + + checkApps(queueSource, 1, 1, 0, 0, 0, 0); + checkApps(userSource, 1, 1, 0, 0, 0, 0); + + metrics.setAvailableQueueMemory(100*GB); + metrics.setAvailableUserMemory(user, 10*GB); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, 0, 0, 100, 15, 5); + checkResources(userSource, 0, 0, 10, 15, 5); + + metrics.incrAppsRunning(user); + checkApps(queueSource, 1, 0, 1, 0, 0, 0); + checkApps(userSource, 1, 0, 1, 0, 0, 0); + + metrics.allocateResources(user, 3, Resources.createResource(2*GB)); + checkResources(queueSource, 6, 3, 100, 9, 2); + checkResources(userSource, 6, 3, 10, 9, 2); + + metrics.releaseResources(user, 1, Resources.createResource(2*GB)); + checkResources(queueSource, 4, 2, 100, 9, 2); + checkResources(userSource, 4, 2, 10, 9, 2); + + metrics.finishApp(app); + checkApps(queueSource, 1, 0, 0, 1, 0, 0); + checkApps(userSource, 1, 0, 0, 1, 0, 0); + } + + @Test public void testTwoLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + + QueueMetrics parentMetrics = + QueueMetrics.forQueue(ms, parentQueueName, null, true); + Queue parentQueue = make(stub(Queue.class).returning(parentMetrics). + from.getMetrics()); + QueueMetrics metrics = + QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true); + MetricsSource parentQueueSource = queueSource(ms, parentQueueName); + MetricsSource queueSource = queueSource(ms, leafQueueName); + Application app = mockApp(user); + + metrics.submitApp(user); + MetricsSource userSource = userSource(ms, leafQueueName, user); + MetricsSource parentUserSource = userSource(ms, parentQueueName, user); + + checkApps(queueSource, 1, 1, 0, 0, 0, 0); + checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0); + checkApps(userSource, 1, 1, 0, 0, 0, 0); + checkApps(parentUserSource, 1, 1, 0, 0, 0, 0); + + parentMetrics.setAvailableQueueMemory(100*GB); + metrics.setAvailableQueueMemory(100*GB); + parentMetrics.setAvailableUserMemory(user, 10*GB); + metrics.setAvailableUserMemory(user, 10*GB); + metrics.incrPendingResources(user, 5, Resources.createResource(15*GB)); + checkResources(queueSource, 0, 0, 100, 15, 5); + checkResources(parentQueueSource, 0, 0, 100, 15, 5); + checkResources(userSource, 0, 0, 10, 15, 5); + checkResources(parentUserSource, 0, 0, 10, 15, 5); + + metrics.incrAppsRunning(user); + checkApps(queueSource, 1, 0, 1, 0, 0, 0); + checkApps(userSource, 1, 0, 1, 0, 0, 0); + + metrics.allocateResources(user, 3, Resources.createResource(2*GB)); + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, 6, 3, 100, 9, 2); + checkResources(parentQueueSource, 6, 3, 100, 9, 2); + checkResources(userSource, 6, 3, 10, 9, 2); + checkResources(parentUserSource, 6, 3, 10, 9, 2); + + metrics.releaseResources(user, 1, Resources.createResource(2*GB)); + checkResources(queueSource, 4, 2, 100, 9, 2); + checkResources(parentQueueSource, 4, 2, 100, 9, 2); + checkResources(userSource, 4, 2, 10, 9, 2); + checkResources(parentUserSource, 4, 2, 10, 9, 2); + + metrics.finishApp(app); + checkApps(queueSource, 1, 0, 0, 1, 0, 0); + checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0); + checkApps(userSource, 1, 0, 0, 1, 0, 0); + checkApps(parentUserSource, 1, 0, 0, 1, 0, 0); + } + + public static void checkApps(MetricsSource source, int submitted, int pending, + int running, int completed, int failed, int killed) { + MetricsRecordBuilder rb = getMetrics(source); + assertCounter("AppsSubmitted", submitted, rb); + assertGauge("AppsPending", pending, rb); + assertGauge("AppsRunning", running, rb); + assertCounter("AppsCompleted", completed, rb); + assertCounter("AppsFailed", failed, rb); + assertCounter("AppsKilled", killed, rb); + } + + public static void checkResources(MetricsSource source, int allocGB, + int allocCtnrs, int availGB, int pendingGB, int pendingCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedGB", allocGB, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertGauge("AvailableGB", availGB, rb); + assertGauge("PendingGB", pendingGB, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + } + + private static Application mockApp(String user) { + Application app = mock(Application.class); + when(app.getState()).thenReturn(ApplicationState.RUNNING); + when(app.getUser()).thenReturn(user); + return app; + } + + public static MetricsSource queueSource(MetricsSystem ms, String queue) { + MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).toString()); + return s; + } + + public static MetricsSource userSource(MetricsSystem ms, String queue, + String user) { + MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue). + append(",user=").append(user).toString()); + return s; + } +}