hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1098152 - in /hadoop/mapreduce/branches/MR-279/yarn: yarn-common/src/main/java/org/apache/hadoop/yarn/util/ yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ yarn-server/yarn-serv...
Date Sat, 30 Apr 2011 20:34:50 GMT
Author: mahadev
Date: Sat Apr 30 20:34:49 2011
New Revision: 1098152

URL: http://svn.apache.org/viewvc?rev=1098152&view=rev
Log:
MAPREDUCE-2434. Metrics for ResourceManager. Contributed by Luke Lu. - added files

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Records.java
Sat Apr 30 20:34:49 2011
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util;
+
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+/**
+ * Convenient API record utils
+ */
+public class Records {
+  // The default record factory
+  private static final RecordFactory factory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  public static <T> T newRecord(Class<T> cls) {
+    return factory.newRecordInstance(cls);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/Self.java
Sat Apr 30 20:34:49 2011
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util;
+
+/**
+ * Some utilities for introspection
+ */
+public class Self {
+  private static boolean firstTime = true;
+  private static boolean isUnitTest = false;
+  private static boolean isJUnitTest = false;
+
+  public synchronized static boolean isUnitTest() {
+    detect();
+    return isUnitTest;
+  }
+
+  public synchronized static boolean isJUnitTest() {
+    detect();
+    return isJUnitTest;
+  }
+
+  private synchronized static void detect() {
+    if (!firstTime) {
+      return;
+    }
+    firstTime = false;
+    for (StackTraceElement e : new Throwable().getStackTrace()) {
+      String className = e.getClassName();
+      if (className.startsWith("org.junit")) {
+        isUnitTest = isJUnitTest = true;
+        return;
+      }
+      if (className.startsWith("org.apache.maven.surefire")) {
+        isUnitTest = true;
+        return;
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
Sat Apr 30 20:34:49 2011
@@ -0,0 +1,97 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.resource;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+@Evolving
+public class Resources {
+  // Java doesn't have const :(
+  private static final Resource NONE = createResource(0);
+
+  public static Resource createResource(int memory) {
+    Resource resource = Records.newRecord(Resource.class);
+    resource.setMemory(memory);
+    return resource;
+  }
+
+  public static Resource none() {
+    assert NONE.getMemory() == 0 : "NONE should be empty";
+    return NONE;
+  }
+
+  public static Resource clone(Resource res) {
+    return createResource(res.getMemory());
+  }
+
+  public static Resource addTo(Resource lhs, Resource rhs) {
+    lhs.setMemory(lhs.getMemory() + rhs.getMemory());
+    return lhs;
+  }
+
+  public static Resource add(Resource lhs, Resource rhs) {
+    return addTo(clone(lhs), rhs);
+  }
+
+  public static Resource subtractFrom(Resource lhs, Resource rhs) {
+    lhs.setMemory(lhs.getMemory() - rhs.getMemory());
+    return lhs;
+  }
+
+  public static Resource subtract(Resource lhs, Resource rhs) {
+    return subtractFrom(clone(lhs), rhs);
+  }
+
+  public static Resource negate(Resource resource) {
+    return subtract(NONE, resource);
+  }
+
+  public static Resource multiplyTo(Resource lhs, int by) {
+    lhs.setMemory(lhs.getMemory() * by);
+    return lhs;
+  }
+
+  public static Resource multiply(Resource lhs, int by) {
+    return multiplyTo(clone(lhs), by);
+  }
+
+  public static boolean equals(Resource lhs, Resource rhs) {
+    return lhs.getMemory() == rhs.getMemory();
+  }
+
+  public static boolean lessThan(Resource lhs, Resource rhs) {
+    return lhs.getMemory() < rhs.getMemory();
+  }
+
+  public static boolean lessThanOrEqual(Resource lhs, Resource rhs) {
+    return lhs.getMemory() <= rhs.getMemory();
+  }
+
+  public static boolean greaterThan(Resource lhs, Resource rhs) {
+    return lhs.getMemory() > rhs.getMemory();
+  }
+
+  public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
+    return lhs.getMemory() >= rhs.getMemory();
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
Sat Apr 30 20:34:49 2011
@@ -0,0 +1,256 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import com.google.common.base.Splitter;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Self;
+import static org.apache.hadoop.yarn.server.resourcemanager.resource.Resources.*;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+@InterfaceAudience.Private
+@Metrics(context="yarn")
+public class QueueMetrics {
+  @Metric("# of apps submitted") MutableCounterInt appsSubmitted;
+  @Metric("# of running apps") MutableGaugeInt appsRunning;
+  @Metric("# of pending apps") MutableGaugeInt appsPending;
+  @Metric("# of apps completed") MutableCounterInt appsCompleted;
+  @Metric("# of apps killed") MutableCounterInt appsKilled;
+  @Metric("# of apps failed") MutableCounterInt appsFailed;
+
+  @Metric("Allocated memory in GiB") MutableGaugeInt allocatedGB;
+  @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
+  @Metric("Available memory in GiB") MutableGaugeInt availableGB;
+  @Metric("Pending memory allocation in GiB") MutableGaugeInt pendingGB;
+  @Metric("# of pending containers") MutableGaugeInt pendingContainers;
+
+  static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
+  static final int GB = 1024; // resource.memory is in MB
+  static final MetricsInfo RECORD_INFO = info("SchedulerMetrics",
+      "Metrics for the resource scheduler");
+  static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
+  static final MetricsInfo USER_INFO = info("User", "Metrics by user");
+  static final Splitter Q_SPLITTER =
+      Splitter.on('.').omitEmptyStrings().trimResults();
+
+  // MSXXX: until metrics system handle this automatically
+  private static QueueMetrics dummyMetrics = null;
+  private static boolean firstTime = true;
+
+  final MetricsRegistry registry;
+  final String queueName;
+  final QueueMetrics parent;
+  final MetricsSystem metricsSystem;
+  private final Map<String, QueueMetrics> users;
+
+  QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics)
{
+    registry = new MetricsRegistry(RECORD_INFO);
+    this.queueName = queueName;
+    this.parent = parent != null ? parent.getMetrics() : null;
+    this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
+                                   : null;
+    metricsSystem = ms;
+  }
+
+  QueueMetrics tag(MetricsInfo info, String value) {
+    registry.tag(info, value);
+    return this;
+  }
+
+  static StringBuilder sourceName(String queueName) {
+    StringBuilder sb = new StringBuilder(RECORD_INFO.name());
+    int i = 0;
+    for (String node : Q_SPLITTER.split(queueName)) {
+      sb.append(",q").append(i++).append('=').append(node);
+    }
+    return sb;
+  }
+
+  public synchronized
+  static QueueMetrics forQueue(String queueName, Queue parent,
+                               boolean enableUserMetrics) {
+    MetricsSystem ms = null;
+    if (firstTime) {
+      firstTime = false;
+      ms = DefaultMetricsSystem.instance();
+    } else if (!Self.isUnitTest()) {
+      ms = DefaultMetricsSystem.instance();
+    } else {
+      return dummyMetrics; // metrics specific tests should use the long form
+    }
+    QueueMetrics metrics = forQueue(ms, queueName, parent, enableUserMetrics);
+    if (dummyMetrics == null) {
+      dummyMetrics = metrics;
+    }
+    return metrics;
+  }
+
+  public static QueueMetrics forQueue(MetricsSystem ms, String queueName,
+                                      Queue parent, boolean enableUserMetrics) {
+    QueueMetrics metrics = new QueueMetrics(ms, queueName, parent,
+        enableUserMetrics).tag(QUEUE_INFO, queueName);
+    return ms == null ? metrics : ms.register(sourceName(queueName).toString(),
+        "Metrics for queue: " + queueName, metrics);
+  }
+
+  synchronized QueueMetrics getUserMetrics(String userName) {
+    if (users == null) {
+      return null;
+    }
+    QueueMetrics metrics = users.get(userName);
+    if (metrics == null) {
+      metrics = new QueueMetrics(metricsSystem, queueName, null, false);
+      users.put(userName, metrics);
+      metricsSystem.register(
+          sourceName(queueName).append(",user=").append(userName).toString(),
+          "Metrics for user '"+ userName +"' in queue '"+ queueName +"'",
+          metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName));
+    }
+    return metrics;
+  }
+
+  public void submitApp(String user) {
+    appsSubmitted.incr();
+    appsPending.incr();
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.submitApp(user);
+    }
+    if (parent != null) {
+      parent.submitApp(user);
+    }
+  }
+
+  public void incrAppsRunning(String user) {
+    appsRunning.incr();
+    appsPending.decr();
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.incrAppsRunning(user);
+    }
+    if (parent != null) {
+      parent.incrAppsRunning(user);
+    }
+  }
+
+  public void finishApp(Application app) {
+    ApplicationState state = app.getState();
+    switch (state) {
+      case KILLED: appsKilled.incr(); break;
+      case FAILED: appsFailed.incr(); break;
+      default: appsCompleted.incr();  break;
+    }
+    if (app.isPending()) {
+      appsPending.decr();
+    } else {
+      appsRunning.decr();
+    }
+    QueueMetrics userMetrics = getUserMetrics(app.getUser());
+    if (userMetrics != null) {
+      userMetrics.finishApp(app);
+    }
+    if (parent != null) {
+      parent.finishApp(app);
+    }
+  }
+
+  /**
+   * Set available resources. To be called by scheduler periodically as
+   * resources become available.
+   * @param mb memory in MB
+   */
+  public void setAvailableQueueMemory(int mb) {
+    availableGB.set(mb/GB);
+  }
+
+  /**
+   * Set available resources. To be called by scheduler periodically as
+   * resources become available.
+   * @param user
+   * @param mb memory in MB
+   */
+  public void setAvailableUserMemory(String user, int mb) {
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.setAvailableQueueMemory(mb);
+    }
+  }
+
+  /**
+   * Increment pending resource metrics
+   * @param user
+   * @param containers
+   * @param res the TOTAL delta of resources note this is different from
+   *            the other APIs which use per container resource
+   */
+  public void incrPendingResources(String user, int containers, Resource res) {
+    _incrPendingResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.incrPendingResources(user, containers, res);
+    }
+    if (parent != null) {
+      parent.incrPendingResources(user, containers, res);
+    }
+  }
+
+  private void _incrPendingResources(int containers, Resource res) {
+    pendingContainers.incr(containers);
+    pendingGB.incr(res.getMemory()/GB);
+  }
+
+  public void decrPendingResources(String user, int containers, Resource res) {
+    _decrPendingResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.decrPendingResources(user, containers, res);
+    }
+    if (parent != null) {
+      parent.decrPendingResources(user, containers, res);
+    }
+  }
+
+  private void _decrPendingResources(int containers, Resource res) {
+    pendingContainers.decr(containers);
+    pendingGB.decr(res.getMemory()/GB);
+  }
+
+  public void allocateResources(String user, int containers, Resource res) {
+    allocatedContainers.incr(containers);
+    allocatedGB.incr(res.getMemory()/GB * containers);
+    _decrPendingResources(containers, multiply(res, containers));
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.allocateResources(user, containers, res);
+    }
+    if (parent != null) {
+      parent.allocateResources(user, containers, res);
+    }
+  }
+
+  public void releaseResources(String user, int containers, Resource res) {
+    allocatedContainers.decr(containers);
+    allocatedGB.decr(res.getMemory()/GB * containers);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.releaseResources(user, containers, res);
+    }
+    if (parent != null) {
+      parent.releaseResources(user, containers, res);
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java?rev=1098152&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
Sat Apr 30 20:34:49 2011
@@ -0,0 +1,195 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+import static org.apache.hadoop.test.MockitoMaker.*;
+import org.apache.hadoop.yarn.api.records.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TestQueueMetrics {
+  static final int GB = 1024; // MB
+
+  MetricsSystem ms;
+
+  @Before public void setup() {
+    ms = new MetricsSystemImpl();
+  }
+
+  @Test public void testDefaultSingleQueueMetrics() {
+    String queueName = "single";
+    String user = "alice";
+
+    QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false);
+    MetricsSource queueSource= queueSource(ms, queueName);
+    Application app = mockApp(user);
+
+    metrics.submitApp(user);
+    MetricsSource userSource = userSource(ms, queueName, user);
+    checkApps(queueSource, 1, 1, 0, 0, 0, 0);
+
+    metrics.setAvailableQueueMemory(100*GB);
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+    // Available resources is set externally, as it depends on dynamic
+    // configurable cluster/queue resources
+    checkResources(queueSource, 0, 0, 100, 15, 5);
+
+    metrics.incrAppsRunning(user);
+    checkApps(queueSource, 1, 0, 1, 0, 0, 0);
+
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
+    checkResources(queueSource, 6, 3, 100, 9, 2);
+
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
+    checkResources(queueSource, 4, 2, 100, 9, 2);
+
+    metrics.finishApp(app);
+    checkApps(queueSource, 1, 0, 0, 1, 0, 0);
+    assertNull(userSource);
+  }
+
+  @Test public void testSingleQueueWithUserMetrics() {
+    String queueName = "single2";
+    String user = "dodo";
+
+    QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true);
+    MetricsSource queueSource = queueSource(ms, queueName);
+    Application app = mockApp(user);
+
+    metrics.submitApp(user);
+    MetricsSource userSource = userSource(ms, queueName, user);
+
+    checkApps(queueSource, 1, 1, 0, 0, 0, 0);
+    checkApps(userSource, 1, 1, 0, 0, 0, 0);
+
+    metrics.setAvailableQueueMemory(100*GB);
+    metrics.setAvailableUserMemory(user, 10*GB);
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+    // Available resources is set externally, as it depends on dynamic
+    // configurable cluster/queue resources
+    checkResources(queueSource, 0, 0, 100, 15, 5);
+    checkResources(userSource, 0, 0, 10, 15, 5);
+
+    metrics.incrAppsRunning(user);
+    checkApps(queueSource, 1, 0, 1, 0, 0, 0);
+    checkApps(userSource, 1, 0, 1, 0, 0, 0);
+
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
+    checkResources(queueSource, 6, 3, 100, 9, 2);
+    checkResources(userSource, 6, 3, 10, 9, 2);
+
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
+    checkResources(queueSource, 4, 2, 100, 9, 2);
+    checkResources(userSource, 4, 2, 10, 9, 2);
+
+    metrics.finishApp(app);
+    checkApps(queueSource, 1, 0, 0, 1, 0, 0);
+    checkApps(userSource, 1, 0, 0, 1, 0, 0);
+  }
+
+  @Test public void testTwoLevelWithUserMetrics() {
+    String parentQueueName = "root";
+    String leafQueueName = "root.leaf";
+    String user = "alice";
+
+    QueueMetrics parentMetrics =
+        QueueMetrics.forQueue(ms, parentQueueName, null, true);
+    Queue parentQueue = make(stub(Queue.class).returning(parentMetrics).
+        from.getMetrics());
+    QueueMetrics metrics =
+        QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true);
+    MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
+    MetricsSource queueSource = queueSource(ms, leafQueueName);
+    Application app = mockApp(user);
+
+    metrics.submitApp(user);
+    MetricsSource userSource = userSource(ms, leafQueueName, user);
+    MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
+
+    checkApps(queueSource, 1, 1, 0, 0, 0, 0);
+    checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0);
+    checkApps(userSource, 1, 1, 0, 0, 0, 0);
+    checkApps(parentUserSource, 1, 1, 0, 0, 0, 0);
+
+    parentMetrics.setAvailableQueueMemory(100*GB);
+    metrics.setAvailableQueueMemory(100*GB);
+    parentMetrics.setAvailableUserMemory(user, 10*GB);
+    metrics.setAvailableUserMemory(user, 10*GB);
+    metrics.incrPendingResources(user, 5, Resources.createResource(15*GB));
+    checkResources(queueSource, 0, 0, 100, 15, 5);
+    checkResources(parentQueueSource, 0, 0, 100, 15, 5);
+    checkResources(userSource, 0, 0, 10, 15, 5);
+    checkResources(parentUserSource, 0, 0, 10, 15, 5);
+
+    metrics.incrAppsRunning(user);
+    checkApps(queueSource, 1, 0, 1, 0, 0, 0);
+    checkApps(userSource, 1, 0, 1, 0, 0, 0);
+
+    metrics.allocateResources(user, 3, Resources.createResource(2*GB));
+    // Available resources is set externally, as it depends on dynamic
+    // configurable cluster/queue resources
+    checkResources(queueSource, 6, 3, 100, 9, 2);
+    checkResources(parentQueueSource, 6, 3, 100, 9, 2);
+    checkResources(userSource, 6, 3, 10, 9, 2);
+    checkResources(parentUserSource, 6, 3, 10, 9, 2);
+
+    metrics.releaseResources(user, 1, Resources.createResource(2*GB));
+    checkResources(queueSource, 4, 2, 100, 9, 2);
+    checkResources(parentQueueSource, 4, 2, 100, 9, 2);
+    checkResources(userSource, 4, 2, 10, 9, 2);
+    checkResources(parentUserSource, 4, 2, 10, 9, 2);
+
+    metrics.finishApp(app);
+    checkApps(queueSource, 1, 0, 0, 1, 0, 0);
+    checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0);
+    checkApps(userSource, 1, 0, 0, 1, 0, 0);
+    checkApps(parentUserSource, 1, 0, 0, 1, 0, 0);
+  }
+
+  public static void checkApps(MetricsSource source, int submitted, int pending,
+      int running, int completed, int failed, int killed) {
+    MetricsRecordBuilder rb = getMetrics(source);
+    assertCounter("AppsSubmitted", submitted, rb);
+    assertGauge("AppsPending", pending, rb);
+    assertGauge("AppsRunning", running, rb);
+    assertCounter("AppsCompleted", completed, rb);
+    assertCounter("AppsFailed", failed, rb);
+    assertCounter("AppsKilled", killed, rb);
+  }
+
+  public static void checkResources(MetricsSource source, int allocGB,
+      int allocCtnrs, int availGB, int pendingGB, int pendingCtnrs) {
+    MetricsRecordBuilder rb = getMetrics(source);
+    assertGauge("AllocatedGB", allocGB, rb);
+    assertGauge("AllocatedContainers", allocCtnrs, rb);
+    assertGauge("AvailableGB", availGB, rb);
+    assertGauge("PendingGB", pendingGB, rb);
+    assertGauge("PendingContainers", pendingCtnrs, rb);
+  }
+
+  private static Application mockApp(String user) {
+    Application app = mock(Application.class);
+    when(app.getState()).thenReturn(ApplicationState.RUNNING);
+    when(app.getUser()).thenReturn(user);
+    return app;
+  }
+
+  public static MetricsSource queueSource(MetricsSystem ms, String queue) {
+    MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).toString());
+    return s;
+  }
+
+  public static MetricsSource userSource(MetricsSystem ms, String queue,
+                                         String user) {
+    MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).
+        append(",user=").append(user).toString());
+    return s;
+  }
+}



Mime
View raw message