hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject [08/50] [abbrv] hadoop git commit: YARN-8213. Add Capacity Scheduler performance metrics. (Weiwei Yang via wangda)
Date Thu, 31 May 2018 15:50:02 GMT
YARN-8213. Add Capacity Scheduler performance metrics. (Weiwei Yang via wangda)

Change-Id: Ieea6f3eeb83c90cd74233fea896f0fcd0f325d5f


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f24c842d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f24c842d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f24c842d

Branch: refs/heads/HDDS-4
Commit: f24c842d52e166e8566337ef93c96438f1c870d8
Parents: 8605a38
Author: Wangda Tan <wangda@apache.org>
Authored: Fri May 25 21:53:20 2018 -0700
Committer: Wangda Tan <wangda@apache.org>
Committed: Fri May 25 21:53:20 2018 -0700

----------------------------------------------------------------------
 .../server/resourcemanager/ResourceManager.java |   1 +
 .../scheduler/AbstractYarnScheduler.java        |   5 +
 .../scheduler/ResourceScheduler.java            |   5 +
 .../scheduler/capacity/CapacityScheduler.java   |  31 ++++-
 .../capacity/CapacitySchedulerMetrics.java      | 119 +++++++++++++++++++
 .../TestCapacitySchedulerMetrics.java           | 110 +++++++++++++++++
 6 files changed, 269 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24c842d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 05745ec..c533111 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -1216,6 +1216,7 @@ public class ResourceManager extends CompositeService implements Recoverable
{
   void reinitialize(boolean initialize) {
     ClusterMetrics.destroy();
     QueueMetrics.clearQueueMetrics();
+    getResourceScheduler().resetSchedulerMetrics();
     if (initialize) {
       resetRMContext();
       createAndInitActiveServices(true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24c842d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index b2747f7..18c7b4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -1464,4 +1464,9 @@ public abstract class AbstractYarnScheduler
       SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
     return false;
   }
+
+  @Override
+  public void resetSchedulerMetrics() {
+    // reset scheduler metrics
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24c842d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
index 5a56ac7..dcb6edd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java
@@ -71,4 +71,9 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable {
    */
   boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
       SchedulingRequest schedulingRequest, SchedulerNode schedulerNode);
+
+  /**
+   * Reset scheduler metrics.
+   */
+  void resetSchedulerMetrics();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24c842d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 162d3bb..1c9bf6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1252,6 +1252,7 @@ public class CapacityScheduler extends
 
   @Override
   protected void nodeUpdate(RMNode rmNode) {
+    long begin = System.nanoTime();
     try {
       readLock.lock();
       setLastNodeUpdateTime(Time.now());
@@ -1279,6 +1280,9 @@ public class CapacityScheduler extends
         writeLock.unlock();
       }
     }
+
+    long latency = System.nanoTime() - begin;
+    CapacitySchedulerMetrics.getMetrics().addNodeUpdate(latency);
   }
 
   /**
@@ -1643,17 +1647,28 @@ public class CapacityScheduler extends
       return null;
     }
 
+    long startTime = System.nanoTime();
+
     // Backward compatible way to make sure previous behavior which allocation
     // driven by node heartbeat works.
     FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
     // We have two different logics to handle allocation on single node / multi
     // nodes.
+    CSAssignment assignment;
     if (null != node) {
-      return allocateContainerOnSingleNode(candidates, node, withNodeHeartbeat);
+      assignment = allocateContainerOnSingleNode(candidates,
+          node, withNodeHeartbeat);
     } else{
-      return allocateContainersOnMultiNodes(candidates);
+      assignment = allocateContainersOnMultiNodes(candidates);
+    }
+
+    if (assignment != null && assignment.getAssignmentInformation() != null
+        && assignment.getAssignmentInformation().getNumAllocations() > 0) {
+      long allocateTime = System.nanoTime() - startTime;
+      CapacitySchedulerMetrics.getMetrics().addAllocate(allocateTime);
     }
+    return assignment;
   }
 
   @Override
@@ -2806,6 +2821,7 @@ public class CapacityScheduler extends
   @Override
   public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
       boolean updatePending) {
+    long commitStart = System.nanoTime();
     ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
         (ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;
 
@@ -2844,9 +2860,15 @@ public class CapacityScheduler extends
       if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
         if (app.accept(cluster, request, updatePending)
             && app.apply(cluster, request, updatePending)) {
+          long commitSuccess = System.nanoTime() - commitStart;
+          CapacitySchedulerMetrics.getMetrics()
+              .addCommitSuccess(commitSuccess);
           LOG.info("Allocation proposal accepted");
           isSuccess = true;
         } else{
+          long commitFailed = System.nanoTime() - commitStart;
+          CapacitySchedulerMetrics.getMetrics()
+              .addCommitFailure(commitFailed);
           LOG.info("Failed to accept allocation proposal");
         }
 
@@ -3029,4 +3051,9 @@ public class CapacityScheduler extends
     }
     return autoCreatedLeafQueue;
   }
+
+  @Override
+  public void resetSchedulerMetrics() {
+    CapacitySchedulerMetrics.destroy();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24c842d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
new file mode 100644
index 0000000..5f8988b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerMetrics.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * Metrics for capacity scheduler.
+ */
+@InterfaceAudience.Private
+@Metrics(context="yarn")
+public class CapacitySchedulerMetrics {
+
+  private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+  private static final MetricsInfo RECORD_INFO =
+      info("CapacitySchedulerMetrics",
+          "Metrics for the Yarn Capacity Scheduler");
+
+  @Metric("Scheduler allocate containers") MutableRate allocate;
+  @Metric("Scheduler commit success") MutableRate commitSuccess;
+  @Metric("Scheduler commit failure") MutableRate commitFailure;
+  @Metric("Scheduler node update") MutableRate nodeUpdate;
+
+  private static volatile CapacitySchedulerMetrics INSTANCE = null;
+  private static MetricsRegistry registry;
+
+  public static CapacitySchedulerMetrics getMetrics() {
+    if(!isInitialized.get()){
+      synchronized (CapacitySchedulerMetrics.class) {
+        if(INSTANCE == null){
+          INSTANCE = new CapacitySchedulerMetrics();
+          registerMetrics();
+          isInitialized.set(true);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  private static void registerMetrics() {
+    registry = new MetricsRegistry(RECORD_INFO);
+    registry.tag(RECORD_INFO, "ResourceManager");
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    if (ms != null) {
+      ms.register("CapacitySchedulerMetrics",
+          "Metrics for the Yarn Capacity Scheduler", INSTANCE);
+    }
+  }
+
+  @VisibleForTesting
+  public synchronized static void destroy() {
+    isInitialized.set(false);
+    INSTANCE = null;
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    if (ms != null) {
+      ms.unregisterSource("CapacitySchedulerMetrics");
+    }
+  }
+
+  public void addAllocate(long latency) {
+    this.allocate.add(latency);
+  }
+
+  public void addCommitSuccess(long latency) {
+    this.commitSuccess.add(latency);
+  }
+
+  public void addCommitFailure(long latency) {
+    this.commitFailure.add(latency);
+  }
+
+  public void addNodeUpdate(long latency) {
+    this.nodeUpdate.add(latency);
+  }
+
+  @VisibleForTesting
+  public long getNumOfNodeUpdate() {
+    return this.nodeUpdate.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumOfAllocates() {
+    return this.allocate.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumOfCommitSuccess() {
+    return this.commitSuccess.lastStat().numSamples();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f24c842d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
new file mode 100644
index 0000000..eaa966a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestCapacitySchedulerMetrics.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerMetrics;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test class for CS metrics.
+ */
+public class TestCapacitySchedulerMetrics {
+
+  private MockRM rm;
+
+  @Test
+  public void testCSMetrics() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
+
+    RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    rm = new MockRM(conf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 2048);
+    MockNM nm2 = rm.registerNode("host2:1234", 2048);
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    CapacitySchedulerMetrics csMetrics = CapacitySchedulerMetrics.getMetrics();
+    Assert.assertNotNull(csMetrics);
+    try {
+      GenericTestUtils.waitFor(()
+          -> csMetrics.getNumOfNodeUpdate() == 2, 100, 3000);
+    } catch(TimeoutException e) {
+      Assert.fail("CS metrics not updated on node-update events.");
+    }
+
+    Assert.assertEquals(0, csMetrics.getNumOfAllocates());
+    Assert.assertEquals(0, csMetrics.getNumOfCommitSuccess());
+
+    RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
+        "default", 1, null, null, false);
+    MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
+    am.registerAppAttempt();
+    am.allocate("*", 1024, 1, new ArrayList<>());
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+
+    // Verify HB metrics updated
+    try {
+      GenericTestUtils.waitFor(()
+          -> csMetrics.getNumOfNodeUpdate() == 4, 100, 3000);
+    } catch(TimeoutException e) {
+      Assert.fail("CS metrics not updated on node-update events.");
+    }
+
+    // For async mode, the number of alloc might be bigger than 1
+    Assert.assertTrue(csMetrics.getNumOfAllocates() > 0);
+    // But there will be only 2 successful commit (1 AM + 1 task)
+    Assert.assertEquals(2, csMetrics.getNumOfCommitSuccess());
+  }
+
+  @After
+  public void tearDown() {
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message